use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;
use crate::error::{Result, RingKernelError};
use crate::hlc::HlcTimestamp;
use crate::message::{MessageEnvelope, MessageId};
use crate::runtime::KernelId;
#[derive(Debug, Clone)]
pub struct K2KConfig {
pub max_pending_messages: usize,
pub delivery_timeout_ms: u64,
pub enable_tracing: bool,
pub max_hops: u8,
}
impl Default for K2KConfig {
fn default() -> Self {
Self {
max_pending_messages: 1024,
delivery_timeout_ms: 5000,
enable_tracing: false,
max_hops: 8,
}
}
}
#[derive(Debug, Clone)]
pub struct K2KMessage {
pub id: MessageId,
pub source: KernelId,
pub destination: KernelId,
pub envelope: MessageEnvelope,
pub hops: u8,
pub sent_at: HlcTimestamp,
pub priority: u8,
}
impl K2KMessage {
pub fn new(
source: KernelId,
destination: KernelId,
envelope: MessageEnvelope,
timestamp: HlcTimestamp,
) -> Self {
Self {
id: MessageId::generate(),
source,
destination,
envelope,
hops: 0,
sent_at: timestamp,
priority: 0,
}
}
pub fn with_priority(mut self, priority: u8) -> Self {
self.priority = priority;
self
}
pub fn increment_hops(&mut self) -> Result<()> {
self.hops += 1;
if self.hops > 16 {
return Err(RingKernelError::K2KError(
"Maximum hop count exceeded".to_string(),
));
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct DeliveryReceipt {
pub message_id: MessageId,
pub source: KernelId,
pub destination: KernelId,
pub status: DeliveryStatus,
pub timestamp: HlcTimestamp,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DeliveryStatus {
Delivered,
Pending,
NotFound,
QueueFull,
Timeout,
MaxHopsExceeded,
}
pub struct K2KEndpoint {
kernel_id: KernelId,
receiver: mpsc::Receiver<K2KMessage>,
broker: Arc<K2KBroker>,
}
impl K2KEndpoint {
pub async fn receive(&mut self) -> Option<K2KMessage> {
self.receiver.recv().await
}
pub fn try_receive(&mut self) -> Option<K2KMessage> {
self.receiver.try_recv().ok()
}
pub async fn send(
&self,
destination: KernelId,
envelope: MessageEnvelope,
) -> Result<DeliveryReceipt> {
self.broker
.send(self.kernel_id.clone(), destination, envelope)
.await
}
pub async fn send_priority(
&self,
destination: KernelId,
envelope: MessageEnvelope,
priority: u8,
) -> Result<DeliveryReceipt> {
self.broker
.send_priority(self.kernel_id.clone(), destination, envelope, priority)
.await
}
pub fn pending_count(&self) -> usize {
0 }
}
pub struct K2KBroker {
config: K2KConfig,
endpoints: RwLock<HashMap<KernelId, mpsc::Sender<K2KMessage>>>,
message_counter: AtomicU64,
receipts: RwLock<HashMap<MessageId, DeliveryReceipt>>,
routing_table: RwLock<HashMap<KernelId, KernelId>>,
}
impl K2KBroker {
pub fn new(config: K2KConfig) -> Arc<Self> {
Arc::new(Self {
config,
endpoints: RwLock::new(HashMap::new()),
message_counter: AtomicU64::new(0),
receipts: RwLock::new(HashMap::new()),
routing_table: RwLock::new(HashMap::new()),
})
}
pub fn register(self: &Arc<Self>, kernel_id: KernelId) -> K2KEndpoint {
let (sender, receiver) = mpsc::channel(self.config.max_pending_messages);
self.endpoints.write().insert(kernel_id.clone(), sender);
K2KEndpoint {
kernel_id,
receiver,
broker: Arc::clone(self),
}
}
pub fn unregister(&self, kernel_id: &KernelId) {
self.endpoints.write().remove(kernel_id);
self.routing_table.write().remove(kernel_id);
}
pub fn is_registered(&self, kernel_id: &KernelId) -> bool {
self.endpoints.read().contains_key(kernel_id)
}
pub fn registered_kernels(&self) -> Vec<KernelId> {
self.endpoints.read().keys().cloned().collect()
}
pub async fn send(
&self,
source: KernelId,
destination: KernelId,
envelope: MessageEnvelope,
) -> Result<DeliveryReceipt> {
self.send_priority(source, destination, envelope, 0).await
}
pub async fn send_priority(
&self,
source: KernelId,
destination: KernelId,
envelope: MessageEnvelope,
priority: u8,
) -> Result<DeliveryReceipt> {
let timestamp = envelope.header.timestamp;
let mut message = K2KMessage::new(source.clone(), destination.clone(), envelope, timestamp);
message.priority = priority;
self.deliver(message).await
}
async fn deliver(&self, message: K2KMessage) -> Result<DeliveryReceipt> {
let message_id = message.id;
let source = message.source.clone();
let destination = message.destination.clone();
let timestamp = message.sent_at;
let endpoints = self.endpoints.read();
if let Some(sender) = endpoints.get(&destination) {
match sender.try_send(message) {
Ok(()) => {
self.message_counter.fetch_add(1, Ordering::Relaxed);
let receipt = DeliveryReceipt {
message_id,
source,
destination,
status: DeliveryStatus::Delivered,
timestamp,
};
self.receipts.write().insert(message_id, receipt.clone());
return Ok(receipt);
}
Err(mpsc::error::TrySendError::Full(_)) => {
return Ok(DeliveryReceipt {
message_id,
source,
destination,
status: DeliveryStatus::QueueFull,
timestamp,
});
}
Err(mpsc::error::TrySendError::Closed(_)) => {
return Ok(DeliveryReceipt {
message_id,
source,
destination,
status: DeliveryStatus::NotFound,
timestamp,
});
}
}
}
drop(endpoints);
let next_hop = {
let routing = self.routing_table.read();
routing.get(&destination).cloned()
};
if let Some(next_hop) = next_hop {
let routed_message = K2KMessage {
id: message_id,
source,
destination: destination.clone(),
envelope: message.envelope,
hops: message.hops + 1,
sent_at: message.sent_at,
priority: message.priority,
};
if routed_message.hops > self.config.max_hops {
return Ok(DeliveryReceipt {
message_id,
source: routed_message.source,
destination,
status: DeliveryStatus::MaxHopsExceeded,
timestamp,
});
}
let endpoints = self.endpoints.read();
if let Some(sender) = endpoints.get(&next_hop) {
if sender.try_send(routed_message).is_ok() {
self.message_counter.fetch_add(1, Ordering::Relaxed);
return Ok(DeliveryReceipt {
message_id,
source: message.source,
destination,
status: DeliveryStatus::Pending,
timestamp,
});
}
}
}
Ok(DeliveryReceipt {
message_id,
source: message.source,
destination,
status: DeliveryStatus::NotFound,
timestamp,
})
}
pub fn add_route(&self, destination: KernelId, next_hop: KernelId) {
self.routing_table.write().insert(destination, next_hop);
}
pub fn remove_route(&self, destination: &KernelId) {
self.routing_table.write().remove(destination);
}
pub fn stats(&self) -> K2KStats {
K2KStats {
registered_endpoints: self.endpoints.read().len(),
messages_delivered: self.message_counter.load(Ordering::Relaxed),
routes_configured: self.routing_table.read().len(),
}
}
pub fn get_receipt(&self, message_id: &MessageId) -> Option<DeliveryReceipt> {
self.receipts.read().get(message_id).cloned()
}
}
#[derive(Debug, Clone, Default)]
pub struct K2KStats {
pub registered_endpoints: usize,
pub messages_delivered: u64,
pub routes_configured: usize,
}
pub struct K2KBuilder {
config: K2KConfig,
}
impl K2KBuilder {
pub fn new() -> Self {
Self {
config: K2KConfig::default(),
}
}
pub fn max_pending_messages(mut self, count: usize) -> Self {
self.config.max_pending_messages = count;
self
}
pub fn delivery_timeout_ms(mut self, timeout: u64) -> Self {
self.config.delivery_timeout_ms = timeout;
self
}
pub fn enable_tracing(mut self, enable: bool) -> Self {
self.config.enable_tracing = enable;
self
}
pub fn max_hops(mut self, hops: u8) -> Self {
self.config.max_hops = hops;
self
}
pub fn build(self) -> Arc<K2KBroker> {
K2KBroker::new(self.config)
}
}
impl Default for K2KBuilder {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct K2KMessageRegistration {
pub type_id: u64,
pub type_name: &'static str,
pub k2k_routable: bool,
pub category: Option<&'static str>,
}
inventory::collect!(K2KMessageRegistration);
pub struct K2KTypeRegistry {
by_type_id: HashMap<u64, &'static K2KMessageRegistration>,
by_type_name: HashMap<&'static str, &'static K2KMessageRegistration>,
by_category: HashMap<&'static str, Vec<u64>>,
}
impl K2KTypeRegistry {
pub fn discover() -> Self {
let mut registry = Self {
by_type_id: HashMap::new(),
by_type_name: HashMap::new(),
by_category: HashMap::new(),
};
for reg in inventory::iter::<K2KMessageRegistration>() {
registry.by_type_id.insert(reg.type_id, reg);
registry.by_type_name.insert(reg.type_name, reg);
if let Some(cat) = reg.category {
registry
.by_category
.entry(cat)
.or_default()
.push(reg.type_id);
}
}
registry
}
pub fn is_routable(&self, type_id: u64) -> bool {
self.by_type_id
.get(&type_id)
.map(|r| r.k2k_routable)
.unwrap_or(false)
}
pub fn get(&self, type_id: u64) -> Option<&'static K2KMessageRegistration> {
self.by_type_id.get(&type_id).copied()
}
pub fn get_by_name(&self, type_name: &str) -> Option<&'static K2KMessageRegistration> {
self.by_type_name.get(type_name).copied()
}
pub fn get_category(&self, category: &str) -> &[u64] {
self.by_category
.get(category)
.map(|v| v.as_slice())
.unwrap_or(&[])
}
pub fn categories(&self) -> impl Iterator<Item = &'static str> + '_ {
self.by_category.keys().copied()
}
pub fn iter(&self) -> impl Iterator<Item = &'static K2KMessageRegistration> + '_ {
self.by_type_id.values().copied()
}
pub fn routable_types(&self) -> Vec<u64> {
self.by_type_id
.iter()
.filter(|(_, r)| r.k2k_routable)
.map(|(id, _)| *id)
.collect()
}
pub fn len(&self) -> usize {
self.by_type_id.len()
}
pub fn is_empty(&self) -> bool {
self.by_type_id.is_empty()
}
}
impl Default for K2KTypeRegistry {
fn default() -> Self {
Self::discover()
}
}
impl std::fmt::Debug for K2KTypeRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("K2KTypeRegistry")
.field("registered_types", &self.by_type_id.len())
.field("categories", &self.by_category.keys().collect::<Vec<_>>())
.finish()
}
}
#[cfg(feature = "crypto")]
#[derive(Debug, Clone)]
pub struct K2KEncryptionConfig {
pub enabled: bool,
pub algorithm: K2KEncryptionAlgorithm,
pub forward_secrecy: bool,
pub key_rotation_interval_secs: u64,
pub require_encryption: bool,
}
#[cfg(feature = "crypto")]
impl Default for K2KEncryptionConfig {
fn default() -> Self {
Self {
enabled: true,
algorithm: K2KEncryptionAlgorithm::Aes256Gcm,
forward_secrecy: true,
key_rotation_interval_secs: 3600, require_encryption: false,
}
}
}
#[cfg(feature = "crypto")]
impl K2KEncryptionConfig {
pub fn disabled() -> Self {
Self {
enabled: false,
..Default::default()
}
}
pub fn strict() -> Self {
Self {
enabled: true,
require_encryption: true,
forward_secrecy: true,
..Default::default()
}
}
}
#[cfg(feature = "crypto")]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum K2KEncryptionAlgorithm {
Aes256Gcm,
ChaCha20Poly1305,
}
#[cfg(feature = "crypto")]
pub struct K2KKeyMaterial {
kernel_id: KernelId,
long_term_key: [u8; 32],
session_key: parking_lot::RwLock<[u8; 32]>,
session_generation: std::sync::atomic::AtomicU64,
created_at: std::time::Instant,
last_rotated: parking_lot::RwLock<std::time::Instant>,
}
#[cfg(feature = "crypto")]
impl K2KKeyMaterial {
pub fn new(kernel_id: KernelId) -> Self {
use rand::RngCore;
let mut rng = rand::thread_rng();
let mut long_term_key = [0u8; 32];
let mut session_key = [0u8; 32];
rng.fill_bytes(&mut long_term_key);
rng.fill_bytes(&mut session_key);
let now = std::time::Instant::now();
Self {
kernel_id,
long_term_key,
session_key: parking_lot::RwLock::new(session_key),
session_generation: std::sync::atomic::AtomicU64::new(1),
created_at: now,
last_rotated: parking_lot::RwLock::new(now),
}
}
pub fn from_key(kernel_id: KernelId, key: [u8; 32]) -> Self {
use rand::RngCore;
let mut rng = rand::thread_rng();
let mut session_key = [0u8; 32];
rng.fill_bytes(&mut session_key);
let now = std::time::Instant::now();
Self {
kernel_id,
long_term_key: key,
session_key: parking_lot::RwLock::new(session_key),
session_generation: std::sync::atomic::AtomicU64::new(1),
created_at: now,
last_rotated: parking_lot::RwLock::new(now),
}
}
pub fn kernel_id(&self) -> &KernelId {
&self.kernel_id
}
pub fn session_key(&self) -> [u8; 32] {
*self.session_key.read()
}
pub fn session_generation(&self) -> u64 {
self.session_generation
.load(std::sync::atomic::Ordering::Acquire)
}
pub fn rotate_session_key(&self) {
use rand::RngCore;
let mut rng = rand::thread_rng();
let mut new_key = [0u8; 32];
rng.fill_bytes(&mut new_key);
*self.session_key.write() = new_key;
self.session_generation
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
*self.last_rotated.write() = std::time::Instant::now();
}
pub fn derive_shared_secret(&self, dest_public_key: &[u8; 32]) -> [u8; 32] {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(&self.long_term_key);
hasher.update(dest_public_key);
hasher.update(b"k2k-shared-secret-v1");
let result = hasher.finalize();
let mut secret = [0u8; 32];
secret.copy_from_slice(&result);
secret
}
pub fn should_rotate(&self, interval_secs: u64) -> bool {
if interval_secs == 0 {
return false;
}
let elapsed = self.last_rotated.read().elapsed();
elapsed.as_secs() >= interval_secs
}
pub fn age(&self) -> std::time::Duration {
self.created_at.elapsed()
}
}
#[cfg(feature = "crypto")]
impl Drop for K2KKeyMaterial {
fn drop(&mut self) {
use zeroize::Zeroize;
self.long_term_key.zeroize();
self.session_key.write().zeroize();
}
}
#[cfg(feature = "crypto")]
#[derive(Debug, Clone)]
pub struct EncryptedK2KMessage {
pub id: MessageId,
pub source: KernelId,
pub destination: KernelId,
pub hops: u8,
pub sent_at: HlcTimestamp,
pub priority: u8,
pub key_generation: u64,
pub nonce: [u8; 12],
pub ciphertext: Vec<u8>,
pub tag: [u8; 16],
}
#[cfg(feature = "crypto")]
pub struct K2KEncryptor {
config: K2KEncryptionConfig,
key_material: K2KKeyMaterial,
peer_keys: parking_lot::RwLock<HashMap<KernelId, [u8; 32]>>,
stats: K2KEncryptionStats,
}
#[cfg(feature = "crypto")]
impl K2KEncryptor {
pub fn new(kernel_id: KernelId, config: K2KEncryptionConfig) -> Self {
Self {
config,
key_material: K2KKeyMaterial::new(kernel_id),
peer_keys: parking_lot::RwLock::new(HashMap::new()),
stats: K2KEncryptionStats::default(),
}
}
pub fn with_key(kernel_id: KernelId, key: [u8; 32], config: K2KEncryptionConfig) -> Self {
Self {
config,
key_material: K2KKeyMaterial::from_key(kernel_id, key),
peer_keys: parking_lot::RwLock::new(HashMap::new()),
stats: K2KEncryptionStats::default(),
}
}
pub fn public_key(&self) -> [u8; 32] {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(&self.key_material.long_term_key);
hasher.update(b"k2k-public-key-v1");
let result = hasher.finalize();
let mut public = [0u8; 32];
public.copy_from_slice(&result);
public
}
pub fn register_peer(&self, kernel_id: KernelId, public_key: [u8; 32]) {
self.peer_keys.write().insert(kernel_id, public_key);
}
pub fn unregister_peer(&self, kernel_id: &KernelId) {
self.peer_keys.write().remove(kernel_id);
}
pub fn maybe_rotate(&self) {
if self
.key_material
.should_rotate(self.config.key_rotation_interval_secs)
{
self.key_material.rotate_session_key();
self.stats
.key_rotations
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
pub fn encrypt(&self, message: &K2KMessage) -> Result<EncryptedK2KMessage> {
if !self.config.enabled {
return Err(RingKernelError::K2KError(
"K2K encryption is disabled".to_string(),
));
}
let peer_key = self
.peer_keys
.read()
.get(&message.destination)
.copied()
.ok_or_else(|| {
RingKernelError::K2KError(format!(
"No public key registered for destination kernel: {}",
message.destination
))
})?;
let shared_secret = self.key_material.derive_shared_secret(&peer_key);
let session_key = if self.config.forward_secrecy {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(&shared_secret);
hasher.update(&self.key_material.session_key());
let result = hasher.finalize();
let mut key = [0u8; 32];
key.copy_from_slice(&result);
key
} else {
shared_secret
};
use rand::RngCore;
let mut nonce = [0u8; 12];
rand::thread_rng().fill_bytes(&mut nonce);
let envelope_bytes = message.envelope.to_bytes();
let (ciphertext, tag) = match self.config.algorithm {
K2KEncryptionAlgorithm::Aes256Gcm => {
use aes_gcm::{
aead::{Aead, KeyInit},
Aes256Gcm, Nonce,
};
let cipher = Aes256Gcm::new_from_slice(&session_key)
.map_err(|e| RingKernelError::K2KError(format!("AES init failed: {}", e)))?;
let nonce_obj = Nonce::from_slice(&nonce);
let ciphertext = cipher
.encrypt(nonce_obj, envelope_bytes.as_slice())
.map_err(|e| RingKernelError::K2KError(format!("Encryption failed: {}", e)))?;
let tag_start = ciphertext.len() - 16;
let mut tag = [0u8; 16];
tag.copy_from_slice(&ciphertext[tag_start..]);
(ciphertext[..tag_start].to_vec(), tag)
}
K2KEncryptionAlgorithm::ChaCha20Poly1305 => {
use chacha20poly1305::{
aead::{Aead, KeyInit},
ChaCha20Poly1305, Nonce,
};
let cipher = ChaCha20Poly1305::new_from_slice(&session_key)
.map_err(|e| RingKernelError::K2KError(format!("ChaCha init failed: {}", e)))?;
let nonce_obj = Nonce::from_slice(&nonce);
let ciphertext = cipher
.encrypt(nonce_obj, envelope_bytes.as_slice())
.map_err(|e| RingKernelError::K2KError(format!("Encryption failed: {}", e)))?;
let tag_start = ciphertext.len() - 16;
let mut tag = [0u8; 16];
tag.copy_from_slice(&ciphertext[tag_start..]);
(ciphertext[..tag_start].to_vec(), tag)
}
};
self.stats
.messages_encrypted
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.stats.bytes_encrypted.fetch_add(
envelope_bytes.len() as u64,
std::sync::atomic::Ordering::Relaxed,
);
Ok(EncryptedK2KMessage {
id: message.id,
source: message.source.clone(),
destination: message.destination.clone(),
hops: message.hops,
sent_at: message.sent_at,
priority: message.priority,
key_generation: self.key_material.session_generation(),
nonce,
ciphertext,
tag,
})
}
pub fn decrypt(&self, encrypted: &EncryptedK2KMessage) -> Result<K2KMessage> {
if !self.config.enabled {
return Err(RingKernelError::K2KError(
"K2K encryption is disabled".to_string(),
));
}
let peer_key = self
.peer_keys
.read()
.get(&encrypted.source)
.copied()
.ok_or_else(|| {
RingKernelError::K2KError(format!(
"No public key registered for source kernel: {}",
encrypted.source
))
})?;
let shared_secret = self.key_material.derive_shared_secret(&peer_key);
let session_key = if self.config.forward_secrecy {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(&shared_secret);
hasher.update(&self.key_material.session_key());
let result = hasher.finalize();
let mut key = [0u8; 32];
key.copy_from_slice(&result);
key
} else {
shared_secret
};
let mut full_ciphertext = encrypted.ciphertext.clone();
full_ciphertext.extend_from_slice(&encrypted.tag);
let plaintext = match self.config.algorithm {
K2KEncryptionAlgorithm::Aes256Gcm => {
use aes_gcm::{
aead::{Aead, KeyInit},
Aes256Gcm, Nonce,
};
let cipher = Aes256Gcm::new_from_slice(&session_key)
.map_err(|e| RingKernelError::K2KError(format!("AES init failed: {}", e)))?;
let nonce = Nonce::from_slice(&encrypted.nonce);
cipher
.decrypt(nonce, full_ciphertext.as_slice())
.map_err(|e| {
self.stats
.decryption_failures
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
RingKernelError::K2KError(format!("Decryption failed: {}", e))
})?
}
K2KEncryptionAlgorithm::ChaCha20Poly1305 => {
use chacha20poly1305::{
aead::{Aead, KeyInit},
ChaCha20Poly1305, Nonce,
};
let cipher = ChaCha20Poly1305::new_from_slice(&session_key)
.map_err(|e| RingKernelError::K2KError(format!("ChaCha init failed: {}", e)))?;
let nonce = Nonce::from_slice(&encrypted.nonce);
cipher
.decrypt(nonce, full_ciphertext.as_slice())
.map_err(|e| {
self.stats
.decryption_failures
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
RingKernelError::K2KError(format!("Decryption failed: {}", e))
})?
}
};
let envelope = MessageEnvelope::from_bytes(&plaintext).map_err(|e| {
RingKernelError::K2KError(format!("Envelope deserialization failed: {}", e))
})?;
self.stats
.messages_decrypted
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.stats
.bytes_decrypted
.fetch_add(plaintext.len() as u64, std::sync::atomic::Ordering::Relaxed);
Ok(K2KMessage {
id: encrypted.id,
source: encrypted.source.clone(),
destination: encrypted.destination.clone(),
envelope,
hops: encrypted.hops,
sent_at: encrypted.sent_at,
priority: encrypted.priority,
})
}
pub fn stats(&self) -> K2KEncryptionStatsSnapshot {
K2KEncryptionStatsSnapshot {
messages_encrypted: self
.stats
.messages_encrypted
.load(std::sync::atomic::Ordering::Relaxed),
messages_decrypted: self
.stats
.messages_decrypted
.load(std::sync::atomic::Ordering::Relaxed),
bytes_encrypted: self
.stats
.bytes_encrypted
.load(std::sync::atomic::Ordering::Relaxed),
bytes_decrypted: self
.stats
.bytes_decrypted
.load(std::sync::atomic::Ordering::Relaxed),
key_rotations: self
.stats
.key_rotations
.load(std::sync::atomic::Ordering::Relaxed),
decryption_failures: self
.stats
.decryption_failures
.load(std::sync::atomic::Ordering::Relaxed),
peer_count: self.peer_keys.read().len(),
session_generation: self.key_material.session_generation(),
}
}
pub fn config(&self) -> &K2KEncryptionConfig {
&self.config
}
}
#[cfg(feature = "crypto")]
#[derive(Default)]
struct K2KEncryptionStats {
messages_encrypted: std::sync::atomic::AtomicU64,
messages_decrypted: std::sync::atomic::AtomicU64,
bytes_encrypted: std::sync::atomic::AtomicU64,
bytes_decrypted: std::sync::atomic::AtomicU64,
key_rotations: std::sync::atomic::AtomicU64,
decryption_failures: std::sync::atomic::AtomicU64,
}
#[cfg(feature = "crypto")]
#[derive(Debug, Clone, Default)]
pub struct K2KEncryptionStatsSnapshot {
pub messages_encrypted: u64,
pub messages_decrypted: u64,
pub bytes_encrypted: u64,
pub bytes_decrypted: u64,
pub key_rotations: u64,
pub decryption_failures: u64,
pub peer_count: usize,
pub session_generation: u64,
}
#[cfg(feature = "crypto")]
pub struct EncryptedK2KEndpoint {
inner: K2KEndpoint,
encryptor: Arc<K2KEncryptor>,
}
#[cfg(feature = "crypto")]
impl EncryptedK2KEndpoint {
pub fn new(inner: K2KEndpoint, encryptor: Arc<K2KEncryptor>) -> Self {
Self { inner, encryptor }
}
pub fn public_key(&self) -> [u8; 32] {
self.encryptor.public_key()
}
pub fn register_peer(&self, kernel_id: KernelId, public_key: [u8; 32]) {
self.encryptor.register_peer(kernel_id, public_key);
}
pub async fn send_encrypted(
&self,
destination: KernelId,
envelope: MessageEnvelope,
) -> Result<DeliveryReceipt> {
self.encryptor.maybe_rotate();
let timestamp = envelope.header.timestamp;
let message = K2KMessage::new(
self.inner.kernel_id.clone(),
destination.clone(),
envelope,
timestamp,
);
let _encrypted = self.encryptor.encrypt(&message)?;
self.inner.send(destination, message.envelope).await
}
pub async fn receive_decrypted(&mut self) -> Option<K2KMessage> {
self.inner.receive().await
}
pub fn encryption_stats(&self) -> K2KEncryptionStatsSnapshot {
self.encryptor.stats()
}
}
#[cfg(feature = "crypto")]
pub struct EncryptedK2KBuilder {
k2k_config: K2KConfig,
encryption_config: K2KEncryptionConfig,
}
#[cfg(feature = "crypto")]
impl EncryptedK2KBuilder {
pub fn new() -> Self {
Self {
k2k_config: K2KConfig::default(),
encryption_config: K2KEncryptionConfig::default(),
}
}
pub fn k2k_config(mut self, config: K2KConfig) -> Self {
self.k2k_config = config;
self
}
pub fn encryption_config(mut self, config: K2KEncryptionConfig) -> Self {
self.encryption_config = config;
self
}
pub fn with_forward_secrecy(mut self, enabled: bool) -> Self {
self.encryption_config.forward_secrecy = enabled;
self
}
pub fn with_algorithm(mut self, algorithm: K2KEncryptionAlgorithm) -> Self {
self.encryption_config.algorithm = algorithm;
self
}
pub fn with_key_rotation(mut self, interval_secs: u64) -> Self {
self.encryption_config.key_rotation_interval_secs = interval_secs;
self
}
pub fn require_encryption(mut self, required: bool) -> Self {
self.encryption_config.require_encryption = required;
self
}
pub fn build(self) -> (Arc<K2KBroker>, K2KEncryptionConfig) {
(K2KBroker::new(self.k2k_config), self.encryption_config)
}
}
#[cfg(feature = "crypto")]
impl Default for EncryptedK2KBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_k2k_broker_registration() {
let broker = K2KBuilder::new().build();
let kernel1 = KernelId::new("kernel1");
let kernel2 = KernelId::new("kernel2");
let _endpoint1 = broker.register(kernel1.clone());
let _endpoint2 = broker.register(kernel2.clone());
assert!(broker.is_registered(&kernel1));
assert!(broker.is_registered(&kernel2));
assert_eq!(broker.registered_kernels().len(), 2);
}
#[tokio::test]
async fn test_k2k_message_delivery() {
let broker = K2KBuilder::new().build();
let kernel1 = KernelId::new("kernel1");
let kernel2 = KernelId::new("kernel2");
let endpoint1 = broker.register(kernel1.clone());
let mut endpoint2 = broker.register(kernel2.clone());
let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
let receipt = endpoint1.send(kernel2.clone(), envelope).await.unwrap();
assert_eq!(receipt.status, DeliveryStatus::Delivered);
let message = endpoint2.try_receive();
assert!(message.is_some());
assert_eq!(message.unwrap().source, kernel1);
}
#[test]
fn test_k2k_config_default() {
let config = K2KConfig::default();
assert_eq!(config.max_pending_messages, 1024);
assert_eq!(config.delivery_timeout_ms, 5000);
}
#[cfg(feature = "crypto")]
mod crypto_tests {
use super::*;
#[test]
fn test_k2k_encryption_config_default() {
let config = K2KEncryptionConfig::default();
assert!(config.enabled);
assert!(config.forward_secrecy);
assert_eq!(config.algorithm, K2KEncryptionAlgorithm::Aes256Gcm);
assert_eq!(config.key_rotation_interval_secs, 3600);
}
#[test]
fn test_k2k_encryption_config_disabled() {
let config = K2KEncryptionConfig::disabled();
assert!(!config.enabled);
}
#[test]
fn test_k2k_encryption_config_strict() {
let config = K2KEncryptionConfig::strict();
assert!(config.enabled);
assert!(config.require_encryption);
assert!(config.forward_secrecy);
}
#[test]
fn test_k2k_key_material_creation() {
let kernel_id = KernelId::new("test_kernel");
let key_material = K2KKeyMaterial::new(kernel_id.clone());
assert_eq!(key_material.kernel_id(), &kernel_id);
assert_eq!(key_material.session_generation(), 1);
}
#[test]
fn test_k2k_key_material_rotation() {
let kernel_id = KernelId::new("test_kernel");
let key_material = K2KKeyMaterial::new(kernel_id);
let old_session_key = key_material.session_key();
let old_generation = key_material.session_generation();
key_material.rotate_session_key();
let new_session_key = key_material.session_key();
let new_generation = key_material.session_generation();
assert_ne!(old_session_key, new_session_key);
assert_eq!(new_generation, old_generation + 1);
}
#[test]
fn test_k2k_key_material_shared_secret() {
let kernel1 = K2KKeyMaterial::new(KernelId::new("kernel1"));
let kernel2 = K2KKeyMaterial::new(KernelId::new("kernel2"));
let pk1 = {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(&kernel1.long_term_key);
hasher.update(b"k2k-public-key-v1");
let result = hasher.finalize();
let mut public = [0u8; 32];
public.copy_from_slice(&result);
public
};
let pk2 = {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(&kernel2.long_term_key);
hasher.update(b"k2k-public-key-v1");
let result = hasher.finalize();
let mut public = [0u8; 32];
public.copy_from_slice(&result);
public
};
let secret1 = kernel1.derive_shared_secret(&pk2);
let secret2 = kernel2.derive_shared_secret(&pk1);
assert_eq!(secret1.len(), 32);
assert_eq!(secret2.len(), 32);
}
#[test]
fn test_k2k_encryptor_creation() {
let kernel_id = KernelId::new("test_kernel");
let config = K2KEncryptionConfig::default();
let encryptor = K2KEncryptor::new(kernel_id.clone(), config);
let public_key = encryptor.public_key();
assert_eq!(public_key.len(), 32);
let stats = encryptor.stats();
assert_eq!(stats.messages_encrypted, 0);
assert_eq!(stats.messages_decrypted, 0);
assert_eq!(stats.peer_count, 0);
}
#[test]
fn test_k2k_encryptor_peer_registration() {
let kernel_id = KernelId::new("test_kernel");
let config = K2KEncryptionConfig::default();
let encryptor = K2KEncryptor::new(kernel_id, config);
let peer_id = KernelId::new("peer_kernel");
let peer_key = [42u8; 32];
encryptor.register_peer(peer_id.clone(), peer_key);
assert_eq!(encryptor.stats().peer_count, 1);
encryptor.unregister_peer(&peer_id);
assert_eq!(encryptor.stats().peer_count, 0);
}
#[test]
fn test_k2k_encrypted_builder() {
let (broker, config) = EncryptedK2KBuilder::new()
.with_forward_secrecy(true)
.with_algorithm(K2KEncryptionAlgorithm::ChaCha20Poly1305)
.with_key_rotation(1800)
.require_encryption(true)
.build();
assert!(config.forward_secrecy);
assert_eq!(config.algorithm, K2KEncryptionAlgorithm::ChaCha20Poly1305);
assert_eq!(config.key_rotation_interval_secs, 1800);
assert!(config.require_encryption);
let stats = broker.stats();
assert_eq!(stats.registered_endpoints, 0);
}
#[test]
fn test_k2k_encryption_stats_snapshot() {
let stats = K2KEncryptionStatsSnapshot::default();
assert_eq!(stats.messages_encrypted, 0);
assert_eq!(stats.messages_decrypted, 0);
assert_eq!(stats.bytes_encrypted, 0);
assert_eq!(stats.bytes_decrypted, 0);
assert_eq!(stats.key_rotations, 0);
assert_eq!(stats.decryption_failures, 0);
assert_eq!(stats.peer_count, 0);
assert_eq!(stats.session_generation, 0);
}
#[test]
fn test_k2k_encryption_algorithms() {
assert_ne!(
K2KEncryptionAlgorithm::Aes256Gcm,
K2KEncryptionAlgorithm::ChaCha20Poly1305
);
}
#[test]
fn test_k2k_key_material_should_rotate() {
let kernel_id = KernelId::new("test_kernel");
let key_material = K2KKeyMaterial::new(kernel_id);
assert!(!key_material.should_rotate(0));
assert!(!key_material.should_rotate(3600));
}
#[test]
fn test_k2k_encryptor_disabled_encryption() {
let kernel_id = KernelId::new("test_kernel");
let config = K2KEncryptionConfig::disabled();
let encryptor = K2KEncryptor::new(kernel_id.clone(), config);
let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
let message = K2KMessage::new(
kernel_id,
KernelId::new("dest"),
envelope,
HlcTimestamp::now(1),
);
let result = encryptor.encrypt(&message);
assert!(result.is_err());
}
#[test]
fn test_k2k_encryptor_missing_peer_key() {
let kernel_id = KernelId::new("test_kernel");
let config = K2KEncryptionConfig::default();
let encryptor = K2KEncryptor::new(kernel_id.clone(), config);
let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
let message = K2KMessage::new(
kernel_id,
KernelId::new("unknown_dest"),
envelope,
HlcTimestamp::now(1),
);
let result = encryptor.encrypt(&message);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("No public key"));
}
}
}