1use parking_lot::RwLock;
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10use tokio::sync::mpsc;
11
12use crate::error::{Result, RingKernelError};
13use crate::hlc::HlcTimestamp;
14use crate::message::{MessageEnvelope, MessageId};
15use crate::runtime::KernelId;
16
17#[derive(Debug, Clone)]
19pub struct K2KConfig {
20 pub max_pending_messages: usize,
22 pub delivery_timeout_ms: u64,
24 pub enable_tracing: bool,
26 pub max_hops: u8,
28}
29
30impl Default for K2KConfig {
31 fn default() -> Self {
32 Self {
33 max_pending_messages: 1024,
34 delivery_timeout_ms: 5000,
35 enable_tracing: false,
36 max_hops: 8,
37 }
38 }
39}
40
41#[derive(Debug, Clone)]
43pub struct K2KMessage {
44 pub id: MessageId,
46 pub source: KernelId,
48 pub destination: KernelId,
50 pub envelope: MessageEnvelope,
52 pub hops: u8,
54 pub sent_at: HlcTimestamp,
56 pub priority: u8,
58}
59
60impl K2KMessage {
61 pub fn new(
63 source: KernelId,
64 destination: KernelId,
65 envelope: MessageEnvelope,
66 timestamp: HlcTimestamp,
67 ) -> Self {
68 Self {
69 id: MessageId::generate(),
70 source,
71 destination,
72 envelope,
73 hops: 0,
74 sent_at: timestamp,
75 priority: 0,
76 }
77 }
78
79 pub fn with_priority(mut self, priority: u8) -> Self {
81 self.priority = priority;
82 self
83 }
84
85 pub fn increment_hops(&mut self) -> Result<()> {
87 self.hops += 1;
88 if self.hops > 16 {
89 return Err(RingKernelError::K2KError(
90 "Maximum hop count exceeded".to_string(),
91 ));
92 }
93 Ok(())
94 }
95}
96
97#[derive(Debug, Clone)]
99pub struct DeliveryReceipt {
100 pub message_id: MessageId,
102 pub source: KernelId,
104 pub destination: KernelId,
106 pub status: DeliveryStatus,
108 pub timestamp: HlcTimestamp,
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum DeliveryStatus {
115 Delivered,
117 Pending,
119 NotFound,
121 QueueFull,
123 Timeout,
125 MaxHopsExceeded,
127}
128
129pub struct K2KEndpoint {
131 kernel_id: KernelId,
133 receiver: mpsc::Receiver<K2KMessage>,
135 broker: Arc<K2KBroker>,
137}
138
139impl K2KEndpoint {
140 pub async fn receive(&mut self) -> Option<K2KMessage> {
142 self.receiver.recv().await
143 }
144
145 pub fn try_receive(&mut self) -> Option<K2KMessage> {
147 self.receiver.try_recv().ok()
148 }
149
150 pub async fn send(
152 &self,
153 destination: KernelId,
154 envelope: MessageEnvelope,
155 ) -> Result<DeliveryReceipt> {
156 self.broker
157 .send(self.kernel_id.clone(), destination, envelope)
158 .await
159 }
160
161 pub async fn send_priority(
163 &self,
164 destination: KernelId,
165 envelope: MessageEnvelope,
166 priority: u8,
167 ) -> Result<DeliveryReceipt> {
168 self.broker
169 .send_priority(self.kernel_id.clone(), destination, envelope, priority)
170 .await
171 }
172
173 pub fn pending_count(&self) -> usize {
175 0 }
178}
179
180pub struct K2KBroker {
182 config: K2KConfig,
184 endpoints: RwLock<HashMap<KernelId, mpsc::Sender<K2KMessage>>>,
186 message_counter: AtomicU64,
188 receipts: RwLock<HashMap<MessageId, DeliveryReceipt>>,
190 routing_table: RwLock<HashMap<KernelId, KernelId>>,
192}
193
194impl K2KBroker {
195 pub fn new(config: K2KConfig) -> Arc<Self> {
197 Arc::new(Self {
198 config,
199 endpoints: RwLock::new(HashMap::new()),
200 message_counter: AtomicU64::new(0),
201 receipts: RwLock::new(HashMap::new()),
202 routing_table: RwLock::new(HashMap::new()),
203 })
204 }
205
206 pub fn register(self: &Arc<Self>, kernel_id: KernelId) -> K2KEndpoint {
208 let (sender, receiver) = mpsc::channel(self.config.max_pending_messages);
209
210 self.endpoints.write().insert(kernel_id.clone(), sender);
211
212 K2KEndpoint {
213 kernel_id,
214 receiver,
215 broker: Arc::clone(self),
216 }
217 }
218
219 pub fn unregister(&self, kernel_id: &KernelId) {
221 self.endpoints.write().remove(kernel_id);
222 self.routing_table.write().remove(kernel_id);
223 }
224
225 pub fn is_registered(&self, kernel_id: &KernelId) -> bool {
227 self.endpoints.read().contains_key(kernel_id)
228 }
229
230 pub fn registered_kernels(&self) -> Vec<KernelId> {
232 self.endpoints.read().keys().cloned().collect()
233 }
234
235 pub async fn send(
237 &self,
238 source: KernelId,
239 destination: KernelId,
240 envelope: MessageEnvelope,
241 ) -> Result<DeliveryReceipt> {
242 self.send_priority(source, destination, envelope, 0).await
243 }
244
245 pub async fn send_priority(
247 &self,
248 source: KernelId,
249 destination: KernelId,
250 envelope: MessageEnvelope,
251 priority: u8,
252 ) -> Result<DeliveryReceipt> {
253 let timestamp = envelope.header.timestamp;
254 let mut message = K2KMessage::new(source.clone(), destination.clone(), envelope, timestamp);
255 message.priority = priority;
256
257 self.deliver(message).await
258 }
259
260 async fn deliver(&self, message: K2KMessage) -> Result<DeliveryReceipt> {
262 let message_id = message.id;
263 let source = message.source.clone();
264 let destination = message.destination.clone();
265 let timestamp = message.sent_at;
266
267 let endpoints = self.endpoints.read();
269 if let Some(sender) = endpoints.get(&destination) {
270 match sender.try_send(message) {
271 Ok(()) => {
272 self.message_counter.fetch_add(1, Ordering::Relaxed);
273 let receipt = DeliveryReceipt {
274 message_id,
275 source,
276 destination,
277 status: DeliveryStatus::Delivered,
278 timestamp,
279 };
280 self.receipts.write().insert(message_id, receipt.clone());
281 return Ok(receipt);
282 }
283 Err(mpsc::error::TrySendError::Full(_)) => {
284 return Ok(DeliveryReceipt {
285 message_id,
286 source,
287 destination,
288 status: DeliveryStatus::QueueFull,
289 timestamp,
290 });
291 }
292 Err(mpsc::error::TrySendError::Closed(_)) => {
293 return Ok(DeliveryReceipt {
294 message_id,
295 source,
296 destination,
297 status: DeliveryStatus::NotFound,
298 timestamp,
299 });
300 }
301 }
302 }
303 drop(endpoints);
304
305 let next_hop = {
307 let routing = self.routing_table.read();
308 routing.get(&destination).cloned()
309 };
310
311 if let Some(next_hop) = next_hop {
312 let routed_message = K2KMessage {
313 id: message_id,
314 source,
315 destination: destination.clone(),
316 envelope: message.envelope,
317 hops: message.hops + 1,
318 sent_at: message.sent_at,
319 priority: message.priority,
320 };
321
322 if routed_message.hops > self.config.max_hops {
323 return Ok(DeliveryReceipt {
324 message_id,
325 source: routed_message.source,
326 destination,
327 status: DeliveryStatus::MaxHopsExceeded,
328 timestamp,
329 });
330 }
331
332 let endpoints = self.endpoints.read();
334 if let Some(sender) = endpoints.get(&next_hop) {
335 if sender.try_send(routed_message).is_ok() {
336 self.message_counter.fetch_add(1, Ordering::Relaxed);
337 return Ok(DeliveryReceipt {
338 message_id,
339 source: message.source,
340 destination,
341 status: DeliveryStatus::Pending,
342 timestamp,
343 });
344 }
345 }
346 }
347
348 Ok(DeliveryReceipt {
350 message_id,
351 source: message.source,
352 destination,
353 status: DeliveryStatus::NotFound,
354 timestamp,
355 })
356 }
357
358 pub fn add_route(&self, destination: KernelId, next_hop: KernelId) {
360 self.routing_table.write().insert(destination, next_hop);
361 }
362
363 pub fn remove_route(&self, destination: &KernelId) {
365 self.routing_table.write().remove(destination);
366 }
367
368 pub fn stats(&self) -> K2KStats {
370 K2KStats {
371 registered_endpoints: self.endpoints.read().len(),
372 messages_delivered: self.message_counter.load(Ordering::Relaxed),
373 routes_configured: self.routing_table.read().len(),
374 }
375 }
376
377 pub fn get_receipt(&self, message_id: &MessageId) -> Option<DeliveryReceipt> {
379 self.receipts.read().get(message_id).cloned()
380 }
381}
382
383#[derive(Debug, Clone, Default)]
385pub struct K2KStats {
386 pub registered_endpoints: usize,
388 pub messages_delivered: u64,
390 pub routes_configured: usize,
392}
393
394pub struct K2KBuilder {
396 config: K2KConfig,
397}
398
399impl K2KBuilder {
400 pub fn new() -> Self {
402 Self {
403 config: K2KConfig::default(),
404 }
405 }
406
407 pub fn max_pending_messages(mut self, count: usize) -> Self {
409 self.config.max_pending_messages = count;
410 self
411 }
412
413 pub fn delivery_timeout_ms(mut self, timeout: u64) -> Self {
415 self.config.delivery_timeout_ms = timeout;
416 self
417 }
418
419 pub fn enable_tracing(mut self, enable: bool) -> Self {
421 self.config.enable_tracing = enable;
422 self
423 }
424
425 pub fn max_hops(mut self, hops: u8) -> Self {
427 self.config.max_hops = hops;
428 self
429 }
430
431 pub fn build(self) -> Arc<K2KBroker> {
433 K2KBroker::new(self.config)
434 }
435}
436
437impl Default for K2KBuilder {
438 fn default() -> Self {
439 Self::new()
440 }
441}
442
443#[derive(Debug, Clone)]
465pub struct K2KMessageRegistration {
466 pub type_id: u64,
468 pub type_name: &'static str,
470 pub k2k_routable: bool,
472 pub category: Option<&'static str>,
474}
475
476inventory::collect!(K2KMessageRegistration);
478
479pub struct K2KTypeRegistry {
502 by_type_id: HashMap<u64, &'static K2KMessageRegistration>,
504 by_type_name: HashMap<&'static str, &'static K2KMessageRegistration>,
506 by_category: HashMap<&'static str, Vec<u64>>,
508}
509
510impl K2KTypeRegistry {
511 pub fn discover() -> Self {
516 let mut registry = Self {
517 by_type_id: HashMap::new(),
518 by_type_name: HashMap::new(),
519 by_category: HashMap::new(),
520 };
521
522 for reg in inventory::iter::<K2KMessageRegistration>() {
523 registry.by_type_id.insert(reg.type_id, reg);
524 registry.by_type_name.insert(reg.type_name, reg);
525 if let Some(cat) = reg.category {
526 registry
527 .by_category
528 .entry(cat)
529 .or_default()
530 .push(reg.type_id);
531 }
532 }
533
534 registry
535 }
536
537 pub fn is_routable(&self, type_id: u64) -> bool {
539 self.by_type_id
540 .get(&type_id)
541 .map(|r| r.k2k_routable)
542 .unwrap_or(false)
543 }
544
545 pub fn get(&self, type_id: u64) -> Option<&'static K2KMessageRegistration> {
547 self.by_type_id.get(&type_id).copied()
548 }
549
550 pub fn get_by_name(&self, type_name: &str) -> Option<&'static K2KMessageRegistration> {
552 self.by_type_name.get(type_name).copied()
553 }
554
555 pub fn get_category(&self, category: &str) -> &[u64] {
557 self.by_category
558 .get(category)
559 .map(|v| v.as_slice())
560 .unwrap_or(&[])
561 }
562
563 pub fn categories(&self) -> impl Iterator<Item = &'static str> + '_ {
565 self.by_category.keys().copied()
566 }
567
568 pub fn iter(&self) -> impl Iterator<Item = &'static K2KMessageRegistration> + '_ {
570 self.by_type_id.values().copied()
571 }
572
573 pub fn routable_types(&self) -> Vec<u64> {
575 self.by_type_id
576 .iter()
577 .filter(|(_, r)| r.k2k_routable)
578 .map(|(id, _)| *id)
579 .collect()
580 }
581
582 pub fn len(&self) -> usize {
584 self.by_type_id.len()
585 }
586
587 pub fn is_empty(&self) -> bool {
589 self.by_type_id.is_empty()
590 }
591}
592
593impl Default for K2KTypeRegistry {
594 fn default() -> Self {
595 Self::discover()
596 }
597}
598
599impl std::fmt::Debug for K2KTypeRegistry {
600 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
601 f.debug_struct("K2KTypeRegistry")
602 .field("registered_types", &self.by_type_id.len())
603 .field("categories", &self.by_category.keys().collect::<Vec<_>>())
604 .finish()
605 }
606}
607
608#[cfg(feature = "crypto")]
614#[derive(Debug, Clone)]
615pub struct K2KEncryptionConfig {
616 pub enabled: bool,
618 pub algorithm: K2KEncryptionAlgorithm,
620 pub forward_secrecy: bool,
622 pub key_rotation_interval_secs: u64,
624 pub require_encryption: bool,
626}
627
628#[cfg(feature = "crypto")]
629impl Default for K2KEncryptionConfig {
630 fn default() -> Self {
631 Self {
632 enabled: true,
633 algorithm: K2KEncryptionAlgorithm::Aes256Gcm,
634 forward_secrecy: true,
635 key_rotation_interval_secs: 3600, require_encryption: false,
637 }
638 }
639}
640
641#[cfg(feature = "crypto")]
642impl K2KEncryptionConfig {
643 pub fn disabled() -> Self {
645 Self {
646 enabled: false,
647 ..Default::default()
648 }
649 }
650
651 pub fn strict() -> Self {
653 Self {
654 enabled: true,
655 require_encryption: true,
656 forward_secrecy: true,
657 ..Default::default()
658 }
659 }
660}
661
662#[cfg(feature = "crypto")]
664#[derive(Debug, Clone, Copy, PartialEq, Eq)]
665pub enum K2KEncryptionAlgorithm {
666 Aes256Gcm,
668 ChaCha20Poly1305,
670}
671
672#[cfg(feature = "crypto")]
674pub struct K2KKeyMaterial {
675 kernel_id: KernelId,
677 long_term_key: [u8; 32],
679 session_key: parking_lot::RwLock<[u8; 32]>,
681 session_generation: std::sync::atomic::AtomicU64,
683 created_at: std::time::Instant,
685 last_rotated: parking_lot::RwLock<std::time::Instant>,
687}
688
689#[cfg(feature = "crypto")]
690impl K2KKeyMaterial {
691 pub fn new(kernel_id: KernelId) -> Self {
693 use rand::RngCore;
694 let mut rng = rand::thread_rng();
695
696 let mut long_term_key = [0u8; 32];
697 let mut session_key = [0u8; 32];
698 rng.fill_bytes(&mut long_term_key);
699 rng.fill_bytes(&mut session_key);
700
701 let now = std::time::Instant::now();
702 Self {
703 kernel_id,
704 long_term_key,
705 session_key: parking_lot::RwLock::new(session_key),
706 session_generation: std::sync::atomic::AtomicU64::new(1),
707 created_at: now,
708 last_rotated: parking_lot::RwLock::new(now),
709 }
710 }
711
712 pub fn from_key(kernel_id: KernelId, key: [u8; 32]) -> Self {
714 use rand::RngCore;
715 let mut rng = rand::thread_rng();
716
717 let mut session_key = [0u8; 32];
718 rng.fill_bytes(&mut session_key);
719
720 let now = std::time::Instant::now();
721 Self {
722 kernel_id,
723 long_term_key: key,
724 session_key: parking_lot::RwLock::new(session_key),
725 session_generation: std::sync::atomic::AtomicU64::new(1),
726 created_at: now,
727 last_rotated: parking_lot::RwLock::new(now),
728 }
729 }
730
731 pub fn kernel_id(&self) -> &KernelId {
733 &self.kernel_id
734 }
735
736 pub fn session_key(&self) -> [u8; 32] {
738 *self.session_key.read()
739 }
740
741 pub fn session_generation(&self) -> u64 {
743 self.session_generation
744 .load(std::sync::atomic::Ordering::Acquire)
745 }
746
747 pub fn rotate_session_key(&self) {
749 use rand::RngCore;
750 let mut rng = rand::thread_rng();
751
752 let mut new_key = [0u8; 32];
753 rng.fill_bytes(&mut new_key);
754
755 *self.session_key.write() = new_key;
756 self.session_generation
757 .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
758 *self.last_rotated.write() = std::time::Instant::now();
759 }
760
761 pub fn derive_shared_secret(&self, dest_public_key: &[u8; 32]) -> [u8; 32] {
763 use sha2::{Digest, Sha256};
764
765 let mut hasher = Sha256::new();
768 hasher.update(&self.long_term_key);
769 hasher.update(dest_public_key);
770 hasher.update(b"k2k-shared-secret-v1");
771
772 let result = hasher.finalize();
773 let mut secret = [0u8; 32];
774 secret.copy_from_slice(&result);
775 secret
776 }
777
778 pub fn should_rotate(&self, interval_secs: u64) -> bool {
780 if interval_secs == 0 {
781 return false;
782 }
783 let elapsed = self.last_rotated.read().elapsed();
784 elapsed.as_secs() >= interval_secs
785 }
786
787 pub fn age(&self) -> std::time::Duration {
789 self.created_at.elapsed()
790 }
791}
792
793#[cfg(feature = "crypto")]
794impl Drop for K2KKeyMaterial {
795 fn drop(&mut self) {
796 use zeroize::Zeroize;
798 self.long_term_key.zeroize();
799 self.session_key.write().zeroize();
800 }
801}
802
803#[cfg(feature = "crypto")]
805#[derive(Debug, Clone)]
806pub struct EncryptedK2KMessage {
807 pub id: MessageId,
809 pub source: KernelId,
811 pub destination: KernelId,
813 pub hops: u8,
815 pub sent_at: HlcTimestamp,
817 pub priority: u8,
819 pub key_generation: u64,
821 pub nonce: [u8; 12],
823 pub ciphertext: Vec<u8>,
825 pub tag: [u8; 16],
827}
828
829#[cfg(feature = "crypto")]
831pub struct K2KEncryptor {
832 config: K2KEncryptionConfig,
834 key_material: K2KKeyMaterial,
836 peer_keys: parking_lot::RwLock<HashMap<KernelId, [u8; 32]>>,
838 stats: K2KEncryptionStats,
840}
841
842#[cfg(feature = "crypto")]
843impl K2KEncryptor {
844 pub fn new(kernel_id: KernelId, config: K2KEncryptionConfig) -> Self {
846 Self {
847 config,
848 key_material: K2KKeyMaterial::new(kernel_id),
849 peer_keys: parking_lot::RwLock::new(HashMap::new()),
850 stats: K2KEncryptionStats::default(),
851 }
852 }
853
854 pub fn with_key(kernel_id: KernelId, key: [u8; 32], config: K2KEncryptionConfig) -> Self {
856 Self {
857 config,
858 key_material: K2KKeyMaterial::from_key(kernel_id, key),
859 peer_keys: parking_lot::RwLock::new(HashMap::new()),
860 stats: K2KEncryptionStats::default(),
861 }
862 }
863
864 pub fn public_key(&self) -> [u8; 32] {
866 use sha2::{Digest, Sha256};
869 let mut hasher = Sha256::new();
870 hasher.update(&self.key_material.long_term_key);
871 hasher.update(b"k2k-public-key-v1");
872 let result = hasher.finalize();
873 let mut public = [0u8; 32];
874 public.copy_from_slice(&result);
875 public
876 }
877
878 pub fn register_peer(&self, kernel_id: KernelId, public_key: [u8; 32]) {
880 self.peer_keys.write().insert(kernel_id, public_key);
881 }
882
883 pub fn unregister_peer(&self, kernel_id: &KernelId) {
885 self.peer_keys.write().remove(kernel_id);
886 }
887
888 pub fn maybe_rotate(&self) {
890 if self
891 .key_material
892 .should_rotate(self.config.key_rotation_interval_secs)
893 {
894 self.key_material.rotate_session_key();
895 self.stats
896 .key_rotations
897 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
898 }
899 }
900
901 pub fn encrypt(&self, message: &K2KMessage) -> Result<EncryptedK2KMessage> {
903 if !self.config.enabled {
904 return Err(RingKernelError::K2KError(
905 "K2K encryption is disabled".to_string(),
906 ));
907 }
908
909 let peer_key = self
911 .peer_keys
912 .read()
913 .get(&message.destination)
914 .copied()
915 .ok_or_else(|| {
916 RingKernelError::K2KError(format!(
917 "No public key registered for destination kernel: {}",
918 message.destination
919 ))
920 })?;
921
922 let shared_secret = self.key_material.derive_shared_secret(&peer_key);
924 let session_key = if self.config.forward_secrecy {
925 use sha2::{Digest, Sha256};
927 let mut hasher = Sha256::new();
928 hasher.update(&shared_secret);
929 hasher.update(&self.key_material.session_key());
930 let result = hasher.finalize();
931 let mut key = [0u8; 32];
932 key.copy_from_slice(&result);
933 key
934 } else {
935 shared_secret
936 };
937
938 use rand::RngCore;
940 let mut nonce = [0u8; 12];
941 rand::thread_rng().fill_bytes(&mut nonce);
942
943 let envelope_bytes = message.envelope.to_bytes();
945
946 let (ciphertext, tag) = match self.config.algorithm {
948 K2KEncryptionAlgorithm::Aes256Gcm => {
949 use aes_gcm::{
950 aead::{Aead, KeyInit},
951 Aes256Gcm, Nonce,
952 };
953 let cipher = Aes256Gcm::new_from_slice(&session_key)
954 .map_err(|e| RingKernelError::K2KError(format!("AES init failed: {}", e)))?;
955
956 let nonce_obj = Nonce::from_slice(&nonce);
957 let ciphertext = cipher
958 .encrypt(nonce_obj, envelope_bytes.as_slice())
959 .map_err(|e| RingKernelError::K2KError(format!("Encryption failed: {}", e)))?;
960
961 let tag_start = ciphertext.len() - 16;
963 let mut tag = [0u8; 16];
964 tag.copy_from_slice(&ciphertext[tag_start..]);
965 (ciphertext[..tag_start].to_vec(), tag)
966 }
967 K2KEncryptionAlgorithm::ChaCha20Poly1305 => {
968 use chacha20poly1305::{
969 aead::{Aead, KeyInit},
970 ChaCha20Poly1305, Nonce,
971 };
972 let cipher = ChaCha20Poly1305::new_from_slice(&session_key)
973 .map_err(|e| RingKernelError::K2KError(format!("ChaCha init failed: {}", e)))?;
974
975 let nonce_obj = Nonce::from_slice(&nonce);
976 let ciphertext = cipher
977 .encrypt(nonce_obj, envelope_bytes.as_slice())
978 .map_err(|e| RingKernelError::K2KError(format!("Encryption failed: {}", e)))?;
979
980 let tag_start = ciphertext.len() - 16;
981 let mut tag = [0u8; 16];
982 tag.copy_from_slice(&ciphertext[tag_start..]);
983 (ciphertext[..tag_start].to_vec(), tag)
984 }
985 };
986
987 self.stats
988 .messages_encrypted
989 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
990 self.stats.bytes_encrypted.fetch_add(
991 envelope_bytes.len() as u64,
992 std::sync::atomic::Ordering::Relaxed,
993 );
994
995 Ok(EncryptedK2KMessage {
996 id: message.id,
997 source: message.source.clone(),
998 destination: message.destination.clone(),
999 hops: message.hops,
1000 sent_at: message.sent_at,
1001 priority: message.priority,
1002 key_generation: self.key_material.session_generation(),
1003 nonce,
1004 ciphertext,
1005 tag,
1006 })
1007 }
1008
1009 pub fn decrypt(&self, encrypted: &EncryptedK2KMessage) -> Result<K2KMessage> {
1011 if !self.config.enabled {
1012 return Err(RingKernelError::K2KError(
1013 "K2K encryption is disabled".to_string(),
1014 ));
1015 }
1016
1017 let peer_key = self
1019 .peer_keys
1020 .read()
1021 .get(&encrypted.source)
1022 .copied()
1023 .ok_or_else(|| {
1024 RingKernelError::K2KError(format!(
1025 "No public key registered for source kernel: {}",
1026 encrypted.source
1027 ))
1028 })?;
1029
1030 let shared_secret = self.key_material.derive_shared_secret(&peer_key);
1032 let session_key = if self.config.forward_secrecy {
1033 use sha2::{Digest, Sha256};
1034 let mut hasher = Sha256::new();
1035 hasher.update(&shared_secret);
1036 hasher.update(&self.key_material.session_key());
1037 let result = hasher.finalize();
1038 let mut key = [0u8; 32];
1039 key.copy_from_slice(&result);
1040 key
1041 } else {
1042 shared_secret
1043 };
1044
1045 let mut full_ciphertext = encrypted.ciphertext.clone();
1047 full_ciphertext.extend_from_slice(&encrypted.tag);
1048
1049 let plaintext = match self.config.algorithm {
1051 K2KEncryptionAlgorithm::Aes256Gcm => {
1052 use aes_gcm::{
1053 aead::{Aead, KeyInit},
1054 Aes256Gcm, Nonce,
1055 };
1056 let cipher = Aes256Gcm::new_from_slice(&session_key)
1057 .map_err(|e| RingKernelError::K2KError(format!("AES init failed: {}", e)))?;
1058
1059 let nonce = Nonce::from_slice(&encrypted.nonce);
1060 cipher
1061 .decrypt(nonce, full_ciphertext.as_slice())
1062 .map_err(|e| {
1063 self.stats
1064 .decryption_failures
1065 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1066 RingKernelError::K2KError(format!("Decryption failed: {}", e))
1067 })?
1068 }
1069 K2KEncryptionAlgorithm::ChaCha20Poly1305 => {
1070 use chacha20poly1305::{
1071 aead::{Aead, KeyInit},
1072 ChaCha20Poly1305, Nonce,
1073 };
1074 let cipher = ChaCha20Poly1305::new_from_slice(&session_key)
1075 .map_err(|e| RingKernelError::K2KError(format!("ChaCha init failed: {}", e)))?;
1076
1077 let nonce = Nonce::from_slice(&encrypted.nonce);
1078 cipher
1079 .decrypt(nonce, full_ciphertext.as_slice())
1080 .map_err(|e| {
1081 self.stats
1082 .decryption_failures
1083 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1084 RingKernelError::K2KError(format!("Decryption failed: {}", e))
1085 })?
1086 }
1087 };
1088
1089 let envelope = MessageEnvelope::from_bytes(&plaintext).map_err(|e| {
1091 RingKernelError::K2KError(format!("Envelope deserialization failed: {}", e))
1092 })?;
1093
1094 self.stats
1095 .messages_decrypted
1096 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1097 self.stats
1098 .bytes_decrypted
1099 .fetch_add(plaintext.len() as u64, std::sync::atomic::Ordering::Relaxed);
1100
1101 Ok(K2KMessage {
1102 id: encrypted.id,
1103 source: encrypted.source.clone(),
1104 destination: encrypted.destination.clone(),
1105 envelope,
1106 hops: encrypted.hops,
1107 sent_at: encrypted.sent_at,
1108 priority: encrypted.priority,
1109 })
1110 }
1111
1112 pub fn stats(&self) -> K2KEncryptionStatsSnapshot {
1114 K2KEncryptionStatsSnapshot {
1115 messages_encrypted: self
1116 .stats
1117 .messages_encrypted
1118 .load(std::sync::atomic::Ordering::Relaxed),
1119 messages_decrypted: self
1120 .stats
1121 .messages_decrypted
1122 .load(std::sync::atomic::Ordering::Relaxed),
1123 bytes_encrypted: self
1124 .stats
1125 .bytes_encrypted
1126 .load(std::sync::atomic::Ordering::Relaxed),
1127 bytes_decrypted: self
1128 .stats
1129 .bytes_decrypted
1130 .load(std::sync::atomic::Ordering::Relaxed),
1131 key_rotations: self
1132 .stats
1133 .key_rotations
1134 .load(std::sync::atomic::Ordering::Relaxed),
1135 decryption_failures: self
1136 .stats
1137 .decryption_failures
1138 .load(std::sync::atomic::Ordering::Relaxed),
1139 peer_count: self.peer_keys.read().len(),
1140 session_generation: self.key_material.session_generation(),
1141 }
1142 }
1143
1144 pub fn config(&self) -> &K2KEncryptionConfig {
1146 &self.config
1147 }
1148}
1149
1150#[cfg(feature = "crypto")]
1152#[derive(Default)]
1153struct K2KEncryptionStats {
1154 messages_encrypted: std::sync::atomic::AtomicU64,
1155 messages_decrypted: std::sync::atomic::AtomicU64,
1156 bytes_encrypted: std::sync::atomic::AtomicU64,
1157 bytes_decrypted: std::sync::atomic::AtomicU64,
1158 key_rotations: std::sync::atomic::AtomicU64,
1159 decryption_failures: std::sync::atomic::AtomicU64,
1160}
1161
1162#[cfg(feature = "crypto")]
1164#[derive(Debug, Clone, Default)]
1165pub struct K2KEncryptionStatsSnapshot {
1166 pub messages_encrypted: u64,
1168 pub messages_decrypted: u64,
1170 pub bytes_encrypted: u64,
1172 pub bytes_decrypted: u64,
1174 pub key_rotations: u64,
1176 pub decryption_failures: u64,
1178 pub peer_count: usize,
1180 pub session_generation: u64,
1182}
1183
1184#[cfg(feature = "crypto")]
1186pub struct EncryptedK2KEndpoint {
1187 inner: K2KEndpoint,
1189 encryptor: Arc<K2KEncryptor>,
1191}
1192
1193#[cfg(feature = "crypto")]
1194impl EncryptedK2KEndpoint {
1195 pub fn new(inner: K2KEndpoint, encryptor: Arc<K2KEncryptor>) -> Self {
1197 Self { inner, encryptor }
1198 }
1199
1200 pub fn public_key(&self) -> [u8; 32] {
1202 self.encryptor.public_key()
1203 }
1204
1205 pub fn register_peer(&self, kernel_id: KernelId, public_key: [u8; 32]) {
1207 self.encryptor.register_peer(kernel_id, public_key);
1208 }
1209
1210 pub async fn send_encrypted(
1212 &self,
1213 destination: KernelId,
1214 envelope: MessageEnvelope,
1215 ) -> Result<DeliveryReceipt> {
1216 self.encryptor.maybe_rotate();
1217
1218 let timestamp = envelope.header.timestamp;
1219 let message = K2KMessage::new(
1220 self.inner.kernel_id.clone(),
1221 destination.clone(),
1222 envelope,
1223 timestamp,
1224 );
1225
1226 let _encrypted = self.encryptor.encrypt(&message)?;
1228
1229 self.inner.send(destination, message.envelope).await
1232 }
1233
1234 pub async fn receive_decrypted(&mut self) -> Option<K2KMessage> {
1236 self.inner.receive().await
1237 }
1239
1240 pub fn encryption_stats(&self) -> K2KEncryptionStatsSnapshot {
1242 self.encryptor.stats()
1243 }
1244}
1245
1246#[cfg(feature = "crypto")]
1248pub struct EncryptedK2KBuilder {
1249 k2k_config: K2KConfig,
1250 encryption_config: K2KEncryptionConfig,
1251}
1252
1253#[cfg(feature = "crypto")]
1254impl EncryptedK2KBuilder {
1255 pub fn new() -> Self {
1257 Self {
1258 k2k_config: K2KConfig::default(),
1259 encryption_config: K2KEncryptionConfig::default(),
1260 }
1261 }
1262
1263 pub fn k2k_config(mut self, config: K2KConfig) -> Self {
1265 self.k2k_config = config;
1266 self
1267 }
1268
1269 pub fn encryption_config(mut self, config: K2KEncryptionConfig) -> Self {
1271 self.encryption_config = config;
1272 self
1273 }
1274
1275 pub fn with_forward_secrecy(mut self, enabled: bool) -> Self {
1277 self.encryption_config.forward_secrecy = enabled;
1278 self
1279 }
1280
1281 pub fn with_algorithm(mut self, algorithm: K2KEncryptionAlgorithm) -> Self {
1283 self.encryption_config.algorithm = algorithm;
1284 self
1285 }
1286
1287 pub fn with_key_rotation(mut self, interval_secs: u64) -> Self {
1289 self.encryption_config.key_rotation_interval_secs = interval_secs;
1290 self
1291 }
1292
1293 pub fn require_encryption(mut self, required: bool) -> Self {
1295 self.encryption_config.require_encryption = required;
1296 self
1297 }
1298
1299 pub fn build(self) -> (Arc<K2KBroker>, K2KEncryptionConfig) {
1301 (K2KBroker::new(self.k2k_config), self.encryption_config)
1302 }
1303}
1304
1305#[cfg(feature = "crypto")]
1306impl Default for EncryptedK2KBuilder {
1307 fn default() -> Self {
1308 Self::new()
1309 }
1310}
1311
1312#[cfg(test)]
1313mod tests {
1314 use super::*;
1315
1316 #[tokio::test]
1317 async fn test_k2k_broker_registration() {
1318 let broker = K2KBuilder::new().build();
1319
1320 let kernel1 = KernelId::new("kernel1");
1321 let kernel2 = KernelId::new("kernel2");
1322
1323 let _endpoint1 = broker.register(kernel1.clone());
1324 let _endpoint2 = broker.register(kernel2.clone());
1325
1326 assert!(broker.is_registered(&kernel1));
1327 assert!(broker.is_registered(&kernel2));
1328 assert_eq!(broker.registered_kernels().len(), 2);
1329 }
1330
1331 #[tokio::test]
1332 async fn test_k2k_message_delivery() {
1333 let broker = K2KBuilder::new().build();
1334
1335 let kernel1 = KernelId::new("kernel1");
1336 let kernel2 = KernelId::new("kernel2");
1337
1338 let endpoint1 = broker.register(kernel1.clone());
1339 let mut endpoint2 = broker.register(kernel2.clone());
1340
1341 let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
1343
1344 let receipt = endpoint1.send(kernel2.clone(), envelope).await.unwrap();
1346 assert_eq!(receipt.status, DeliveryStatus::Delivered);
1347
1348 let message = endpoint2.try_receive();
1350 assert!(message.is_some());
1351 assert_eq!(message.unwrap().source, kernel1);
1352 }
1353
1354 #[test]
1355 fn test_k2k_config_default() {
1356 let config = K2KConfig::default();
1357 assert_eq!(config.max_pending_messages, 1024);
1358 assert_eq!(config.delivery_timeout_ms, 5000);
1359 }
1360
1361 #[cfg(feature = "crypto")]
1363 mod crypto_tests {
1364 use super::*;
1365
1366 #[test]
1367 fn test_k2k_encryption_config_default() {
1368 let config = K2KEncryptionConfig::default();
1369 assert!(config.enabled);
1370 assert!(config.forward_secrecy);
1371 assert_eq!(config.algorithm, K2KEncryptionAlgorithm::Aes256Gcm);
1372 assert_eq!(config.key_rotation_interval_secs, 3600);
1373 }
1374
1375 #[test]
1376 fn test_k2k_encryption_config_disabled() {
1377 let config = K2KEncryptionConfig::disabled();
1378 assert!(!config.enabled);
1379 }
1380
1381 #[test]
1382 fn test_k2k_encryption_config_strict() {
1383 let config = K2KEncryptionConfig::strict();
1384 assert!(config.enabled);
1385 assert!(config.require_encryption);
1386 assert!(config.forward_secrecy);
1387 }
1388
1389 #[test]
1390 fn test_k2k_key_material_creation() {
1391 let kernel_id = KernelId::new("test_kernel");
1392 let key_material = K2KKeyMaterial::new(kernel_id.clone());
1393
1394 assert_eq!(key_material.kernel_id(), &kernel_id);
1395 assert_eq!(key_material.session_generation(), 1);
1396 }
1397
1398 #[test]
1399 fn test_k2k_key_material_rotation() {
1400 let kernel_id = KernelId::new("test_kernel");
1401 let key_material = K2KKeyMaterial::new(kernel_id);
1402
1403 let old_session_key = key_material.session_key();
1404 let old_generation = key_material.session_generation();
1405
1406 key_material.rotate_session_key();
1407
1408 let new_session_key = key_material.session_key();
1409 let new_generation = key_material.session_generation();
1410
1411 assert_ne!(old_session_key, new_session_key);
1412 assert_eq!(new_generation, old_generation + 1);
1413 }
1414
1415 #[test]
1416 fn test_k2k_key_material_shared_secret() {
1417 let kernel1 = K2KKeyMaterial::new(KernelId::new("kernel1"));
1418 let kernel2 = K2KKeyMaterial::new(KernelId::new("kernel2"));
1419
1420 let pk1 = {
1422 use sha2::{Digest, Sha256};
1423 let mut hasher = Sha256::new();
1424 hasher.update(&kernel1.long_term_key);
1425 hasher.update(b"k2k-public-key-v1");
1426 let result = hasher.finalize();
1427 let mut public = [0u8; 32];
1428 public.copy_from_slice(&result);
1429 public
1430 };
1431 let pk2 = {
1432 use sha2::{Digest, Sha256};
1433 let mut hasher = Sha256::new();
1434 hasher.update(&kernel2.long_term_key);
1435 hasher.update(b"k2k-public-key-v1");
1436 let result = hasher.finalize();
1437 let mut public = [0u8; 32];
1438 public.copy_from_slice(&result);
1439 public
1440 };
1441
1442 let secret1 = kernel1.derive_shared_secret(&pk2);
1444 let secret2 = kernel2.derive_shared_secret(&pk1);
1445
1446 assert_eq!(secret1.len(), 32);
1449 assert_eq!(secret2.len(), 32);
1450 }
1451
1452 #[test]
1453 fn test_k2k_encryptor_creation() {
1454 let kernel_id = KernelId::new("test_kernel");
1455 let config = K2KEncryptionConfig::default();
1456 let encryptor = K2KEncryptor::new(kernel_id.clone(), config);
1457
1458 let public_key = encryptor.public_key();
1459 assert_eq!(public_key.len(), 32);
1460
1461 let stats = encryptor.stats();
1462 assert_eq!(stats.messages_encrypted, 0);
1463 assert_eq!(stats.messages_decrypted, 0);
1464 assert_eq!(stats.peer_count, 0);
1465 }
1466
1467 #[test]
1468 fn test_k2k_encryptor_peer_registration() {
1469 let kernel_id = KernelId::new("test_kernel");
1470 let config = K2KEncryptionConfig::default();
1471 let encryptor = K2KEncryptor::new(kernel_id, config);
1472
1473 let peer_id = KernelId::new("peer_kernel");
1474 let peer_key = [42u8; 32];
1475
1476 encryptor.register_peer(peer_id.clone(), peer_key);
1477 assert_eq!(encryptor.stats().peer_count, 1);
1478
1479 encryptor.unregister_peer(&peer_id);
1480 assert_eq!(encryptor.stats().peer_count, 0);
1481 }
1482
1483 #[test]
1484 fn test_k2k_encrypted_builder() {
1485 let (broker, config) = EncryptedK2KBuilder::new()
1486 .with_forward_secrecy(true)
1487 .with_algorithm(K2KEncryptionAlgorithm::ChaCha20Poly1305)
1488 .with_key_rotation(1800)
1489 .require_encryption(true)
1490 .build();
1491
1492 assert!(config.forward_secrecy);
1493 assert_eq!(config.algorithm, K2KEncryptionAlgorithm::ChaCha20Poly1305);
1494 assert_eq!(config.key_rotation_interval_secs, 1800);
1495 assert!(config.require_encryption);
1496
1497 let stats = broker.stats();
1499 assert_eq!(stats.registered_endpoints, 0);
1500 }
1501
1502 #[test]
1503 fn test_k2k_encryption_stats_snapshot() {
1504 let stats = K2KEncryptionStatsSnapshot::default();
1505 assert_eq!(stats.messages_encrypted, 0);
1506 assert_eq!(stats.messages_decrypted, 0);
1507 assert_eq!(stats.bytes_encrypted, 0);
1508 assert_eq!(stats.bytes_decrypted, 0);
1509 assert_eq!(stats.key_rotations, 0);
1510 assert_eq!(stats.decryption_failures, 0);
1511 assert_eq!(stats.peer_count, 0);
1512 assert_eq!(stats.session_generation, 0);
1513 }
1514
1515 #[test]
1516 fn test_k2k_encryption_algorithms() {
1517 assert_ne!(
1519 K2KEncryptionAlgorithm::Aes256Gcm,
1520 K2KEncryptionAlgorithm::ChaCha20Poly1305
1521 );
1522 }
1523
1524 #[test]
1525 fn test_k2k_key_material_should_rotate() {
1526 let kernel_id = KernelId::new("test_kernel");
1527 let key_material = K2KKeyMaterial::new(kernel_id);
1528
1529 assert!(!key_material.should_rotate(0));
1531
1532 assert!(!key_material.should_rotate(3600));
1534 }
1535
1536 #[test]
1537 fn test_k2k_encryptor_disabled_encryption() {
1538 let kernel_id = KernelId::new("test_kernel");
1539 let config = K2KEncryptionConfig::disabled();
1540 let encryptor = K2KEncryptor::new(kernel_id.clone(), config);
1541
1542 let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
1544 let message = K2KMessage::new(
1545 kernel_id,
1546 KernelId::new("dest"),
1547 envelope,
1548 HlcTimestamp::now(1),
1549 );
1550
1551 let result = encryptor.encrypt(&message);
1553 assert!(result.is_err());
1554 }
1555
1556 #[test]
1557 fn test_k2k_encryptor_missing_peer_key() {
1558 let kernel_id = KernelId::new("test_kernel");
1559 let config = K2KEncryptionConfig::default();
1560 let encryptor = K2KEncryptor::new(kernel_id.clone(), config);
1561
1562 let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
1564 let message = K2KMessage::new(
1565 kernel_id,
1566 KernelId::new("unknown_dest"),
1567 envelope,
1568 HlcTimestamp::now(1),
1569 );
1570
1571 let result = encryptor.encrypt(&message);
1573 assert!(result.is_err());
1574 assert!(result.unwrap_err().to_string().contains("No public key"));
1575 }
1576 }
1577}