use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;
use crate::error::{Result, RingKernelError};
use crate::hlc::HlcTimestamp;
use crate::message::{MessageEnvelope, MessageId};
use crate::runtime::KernelId;
pub mod audit_tag;
pub mod tenant;
pub use audit_tag::AuditTag;
pub use tenant::{TenantId, TenantInfo, TenantQuota, TenantRegistry, UNSPECIFIED_TENANT};
#[derive(Debug, Clone)]
pub struct K2KConfig {
pub max_pending_messages: usize,
pub delivery_timeout_ms: u64,
pub enable_tracing: bool,
pub max_hops: u8,
}
impl Default for K2KConfig {
fn default() -> Self {
Self {
max_pending_messages: 1024,
delivery_timeout_ms: 5000,
enable_tracing: false,
max_hops: 8,
}
}
}
#[derive(Debug, Clone)]
pub struct K2KMessage {
pub id: MessageId,
pub source: KernelId,
pub destination: KernelId,
pub envelope: MessageEnvelope,
pub hops: u8,
pub sent_at: HlcTimestamp,
pub priority: u8,
}
impl K2KMessage {
pub fn new(
source: KernelId,
destination: KernelId,
envelope: MessageEnvelope,
timestamp: HlcTimestamp,
) -> Self {
Self {
id: MessageId::generate(),
source,
destination,
envelope,
hops: 0,
sent_at: timestamp,
priority: 0,
}
}
pub fn with_priority(mut self, priority: u8) -> Self {
self.priority = priority;
self
}
pub fn increment_hops(&mut self) -> Result<()> {
self.hops += 1;
if self.hops > 16 {
return Err(RingKernelError::K2KError(
"Maximum hop count exceeded".to_string(),
));
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct DeliveryReceipt {
pub message_id: MessageId,
pub source: KernelId,
pub destination: KernelId,
pub status: DeliveryStatus,
pub timestamp: HlcTimestamp,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DeliveryStatus {
Delivered,
Pending,
NotFound,
QueueFull,
Timeout,
MaxHopsExceeded,
TenantMismatch,
}
pub struct K2KEndpoint {
pub(crate) kernel_id: KernelId,
receiver: mpsc::Receiver<K2KMessage>,
broker: Arc<K2KBroker>,
}
impl K2KEndpoint {
pub async fn receive(&mut self) -> Option<K2KMessage> {
self.receiver.recv().await
}
pub fn try_receive(&mut self) -> Option<K2KMessage> {
self.receiver.try_recv().ok()
}
pub async fn send(
&self,
destination: KernelId,
envelope: MessageEnvelope,
) -> Result<DeliveryReceipt> {
self.broker
.send(self.kernel_id.clone(), destination, envelope)
.await
}
pub async fn send_priority(
&self,
destination: KernelId,
envelope: MessageEnvelope,
priority: u8,
) -> Result<DeliveryReceipt> {
self.broker
.send_priority(self.kernel_id.clone(), destination, envelope, priority)
.await
}
pub fn pending_count(&self) -> usize {
0 }
}
pub struct K2KSubBroker {
tenant_id: TenantId,
endpoints: RwLock<HashMap<KernelId, mpsc::Sender<K2KMessage>>>,
routing_table: RwLock<HashMap<KernelId, KernelId>>,
kernel_audit_tags: RwLock<HashMap<KernelId, AuditTag>>,
messages_delivered: AtomicU64,
}
impl K2KSubBroker {
fn new(tenant_id: TenantId) -> Self {
Self {
tenant_id,
endpoints: RwLock::new(HashMap::new()),
routing_table: RwLock::new(HashMap::new()),
kernel_audit_tags: RwLock::new(HashMap::new()),
messages_delivered: AtomicU64::new(0),
}
}
pub fn tenant_id(&self) -> TenantId {
self.tenant_id
}
pub fn endpoint_count(&self) -> usize {
self.endpoints.read().len()
}
pub fn messages_delivered(&self) -> u64 {
self.messages_delivered.load(Ordering::Relaxed)
}
pub fn knows(&self, kernel_id: &KernelId) -> bool {
self.endpoints.read().contains_key(kernel_id)
|| self.routing_table.read().contains_key(kernel_id)
}
pub fn audit_tag_for(&self, kernel_id: &KernelId) -> AuditTag {
self.kernel_audit_tags
.read()
.get(kernel_id)
.copied()
.unwrap_or_else(AuditTag::unspecified)
}
}
pub struct K2KBroker {
config: K2KConfig,
tenants: RwLock<HashMap<TenantId, Arc<K2KSubBroker>>>,
kernel_tenant: RwLock<HashMap<KernelId, TenantId>>,
registry: Arc<TenantRegistry>,
receipts: RwLock<HashMap<MessageId, DeliveryReceipt>>,
message_counter: AtomicU64,
cross_tenant_rejections: AtomicU64,
}
impl K2KBroker {
pub fn new(config: K2KConfig) -> Arc<Self> {
Self::with_registry(config, Arc::new(TenantRegistry::new()))
}
pub fn with_registry(config: K2KConfig, registry: Arc<TenantRegistry>) -> Arc<Self> {
let mut tenants = HashMap::new();
tenants.insert(
UNSPECIFIED_TENANT,
Arc::new(K2KSubBroker::new(UNSPECIFIED_TENANT)),
);
Arc::new(Self {
config,
tenants: RwLock::new(tenants),
kernel_tenant: RwLock::new(HashMap::new()),
registry,
receipts: RwLock::new(HashMap::new()),
message_counter: AtomicU64::new(0),
cross_tenant_rejections: AtomicU64::new(0),
})
}
pub fn registry(&self) -> &Arc<TenantRegistry> {
&self.registry
}
pub fn tenant_count(&self) -> usize {
self.tenants.read().len()
}
pub fn sub_broker(&self, tenant_id: TenantId) -> Option<Arc<K2KSubBroker>> {
self.tenants.read().get(&tenant_id).cloned()
}
pub fn register(self: &Arc<Self>, kernel_id: KernelId) -> K2KEndpoint {
self.register_tenant(UNSPECIFIED_TENANT, AuditTag::unspecified(), kernel_id)
}
pub fn register_tenant(
self: &Arc<Self>,
tenant_id: TenantId,
audit_tag: AuditTag,
kernel_id: KernelId,
) -> K2KEndpoint {
let (sender, receiver) = mpsc::channel(self.config.max_pending_messages);
let sub = {
let mut tenants = self.tenants.write();
tenants
.entry(tenant_id)
.or_insert_with(|| Arc::new(K2KSubBroker::new(tenant_id)))
.clone()
};
let mut kernel_tenant = self.kernel_tenant.write();
if let Some(prev_tenant) = kernel_tenant.get(&kernel_id).copied() {
if prev_tenant != tenant_id {
if let Some(prev_sub) = self.tenants.read().get(&prev_tenant).cloned() {
prev_sub.endpoints.write().remove(&kernel_id);
prev_sub.kernel_audit_tags.write().remove(&kernel_id);
prev_sub.routing_table.write().remove(&kernel_id);
}
}
}
kernel_tenant.insert(kernel_id.clone(), tenant_id);
drop(kernel_tenant);
sub.endpoints.write().insert(kernel_id.clone(), sender);
sub.kernel_audit_tags
.write()
.insert(kernel_id.clone(), audit_tag);
K2KEndpoint {
kernel_id,
receiver,
broker: Arc::clone(self),
}
}
pub fn unregister(&self, kernel_id: &KernelId) {
let tenant_id = self.kernel_tenant.write().remove(kernel_id);
if let Some(tenant_id) = tenant_id {
if let Some(sub) = self.tenants.read().get(&tenant_id).cloned() {
sub.endpoints.write().remove(kernel_id);
sub.kernel_audit_tags.write().remove(kernel_id);
sub.routing_table.write().remove(kernel_id);
}
}
}
pub fn is_registered(&self, kernel_id: &KernelId) -> bool {
self.kernel_tenant.read().contains_key(kernel_id)
}
pub fn tenant_of(&self, kernel_id: &KernelId) -> Option<TenantId> {
self.kernel_tenant.read().get(kernel_id).copied()
}
pub fn registered_kernels(&self) -> Vec<KernelId> {
self.kernel_tenant.read().keys().cloned().collect()
}
pub fn registered_kernels_for(&self, tenant_id: TenantId) -> Vec<KernelId> {
self.tenants
.read()
.get(&tenant_id)
.map(|sub| sub.endpoints.read().keys().cloned().collect())
.unwrap_or_default()
}
pub async fn send(
&self,
source: KernelId,
destination: KernelId,
envelope: MessageEnvelope,
) -> Result<DeliveryReceipt> {
self.send_priority(source, destination, envelope, 0).await
}
pub async fn send_priority(
&self,
source: KernelId,
destination: KernelId,
envelope: MessageEnvelope,
priority: u8,
) -> Result<DeliveryReceipt> {
let source_tenant = self
.kernel_tenant
.read()
.get(&source)
.copied()
.unwrap_or(UNSPECIFIED_TENANT);
let dest_tenant = self
.kernel_tenant
.read()
.get(&destination)
.copied()
.unwrap_or(source_tenant);
if source_tenant != dest_tenant {
self.cross_tenant_rejections.fetch_add(1, Ordering::Relaxed);
self.registry.audit_cross_tenant(
source_tenant,
dest_tenant,
source.as_str(),
destination.as_str(),
envelope.audit_tag,
);
return Err(RingKernelError::TenantMismatch {
from: source_tenant,
to: dest_tenant,
});
}
self.registry
.check_quota(source_tenant, envelope.audit_tag)?;
self.registry.record_message(source_tenant);
let mut envelope = envelope;
envelope.tenant_id = source_tenant;
if envelope.audit_tag.is_unspecified() {
let sub = self
.tenants
.read()
.get(&source_tenant)
.cloned()
.expect("tenant sub-broker must exist for registered sender");
envelope.audit_tag = sub.audit_tag_for(&source);
}
let timestamp = envelope.header.timestamp;
let mut message = K2KMessage::new(source.clone(), destination.clone(), envelope, timestamp);
message.priority = priority;
self.deliver_in(source_tenant, message).await
}
pub async fn send_with_audit(
&self,
source: KernelId,
destination: KernelId,
envelope: MessageEnvelope,
audit_tag: AuditTag,
) -> Result<DeliveryReceipt> {
let envelope = envelope.with_audit_tag(audit_tag);
self.send(source, destination, envelope).await
}
async fn deliver_in(
&self,
tenant_id: TenantId,
message: K2KMessage,
) -> Result<DeliveryReceipt> {
let sub = self
.tenants
.read()
.get(&tenant_id)
.cloned()
.ok_or_else(|| {
RingKernelError::K2KError(format!(
"tenant sub-broker {} disappeared mid-send",
tenant_id
))
})?;
let message_id = message.id;
let source = message.source.clone();
let destination = message.destination.clone();
let timestamp = message.sent_at;
let endpoints = sub.endpoints.read();
if let Some(sender) = endpoints.get(&destination) {
match sender.try_send(message) {
Ok(()) => {
sub.messages_delivered.fetch_add(1, Ordering::Relaxed);
self.message_counter.fetch_add(1, Ordering::Relaxed);
let receipt = DeliveryReceipt {
message_id,
source,
destination,
status: DeliveryStatus::Delivered,
timestamp,
};
self.receipts.write().insert(message_id, receipt.clone());
return Ok(receipt);
}
Err(mpsc::error::TrySendError::Full(_)) => {
return Ok(DeliveryReceipt {
message_id,
source,
destination,
status: DeliveryStatus::QueueFull,
timestamp,
});
}
Err(mpsc::error::TrySendError::Closed(_)) => {
return Ok(DeliveryReceipt {
message_id,
source,
destination,
status: DeliveryStatus::NotFound,
timestamp,
});
}
}
}
drop(endpoints);
let next_hop = sub.routing_table.read().get(&destination).cloned();
if let Some(next_hop) = next_hop {
let routed_message = K2KMessage {
id: message_id,
source: source.clone(),
destination: destination.clone(),
envelope: message.envelope,
hops: message.hops + 1,
sent_at: message.sent_at,
priority: message.priority,
};
if routed_message.hops > self.config.max_hops {
return Ok(DeliveryReceipt {
message_id,
source,
destination,
status: DeliveryStatus::MaxHopsExceeded,
timestamp,
});
}
let endpoints = sub.endpoints.read();
if let Some(sender) = endpoints.get(&next_hop) {
if sender.try_send(routed_message).is_ok() {
sub.messages_delivered.fetch_add(1, Ordering::Relaxed);
self.message_counter.fetch_add(1, Ordering::Relaxed);
return Ok(DeliveryReceipt {
message_id,
source,
destination,
status: DeliveryStatus::Pending,
timestamp,
});
}
}
}
Ok(DeliveryReceipt {
message_id,
source,
destination,
status: DeliveryStatus::NotFound,
timestamp,
})
}
pub fn add_route(&self, destination: KernelId, next_hop: KernelId) {
self.add_route_in(UNSPECIFIED_TENANT, destination, next_hop);
}
pub fn add_route_in(&self, tenant_id: TenantId, destination: KernelId, next_hop: KernelId) {
let sub = {
let mut tenants = self.tenants.write();
tenants
.entry(tenant_id)
.or_insert_with(|| Arc::new(K2KSubBroker::new(tenant_id)))
.clone()
};
sub.routing_table.write().insert(destination, next_hop);
}
pub fn remove_route(&self, destination: &KernelId) {
self.remove_route_in(UNSPECIFIED_TENANT, destination);
}
pub fn remove_route_in(&self, tenant_id: TenantId, destination: &KernelId) {
if let Some(sub) = self.tenants.read().get(&tenant_id).cloned() {
sub.routing_table.write().remove(destination);
}
}
pub fn stats(&self) -> K2KStats {
let tenants = self.tenants.read();
let mut registered = 0usize;
let mut routes = 0usize;
for sub in tenants.values() {
registered += sub.endpoints.read().len();
routes += sub.routing_table.read().len();
}
K2KStats {
registered_endpoints: registered,
messages_delivered: self.message_counter.load(Ordering::Relaxed),
routes_configured: routes,
tenant_count: tenants.len(),
cross_tenant_rejections: self.cross_tenant_rejections.load(Ordering::Relaxed),
}
}
pub fn tenant_stats(&self, tenant_id: TenantId) -> Option<TenantStats> {
self.tenants.read().get(&tenant_id).map(|sub| TenantStats {
tenant_id,
registered_endpoints: sub.endpoints.read().len(),
routes_configured: sub.routing_table.read().len(),
messages_delivered: sub.messages_delivered.load(Ordering::Relaxed),
})
}
pub fn get_receipt(&self, message_id: &MessageId) -> Option<DeliveryReceipt> {
self.receipts.read().get(message_id).cloned()
}
}
#[derive(Debug, Clone, Default)]
pub struct K2KStats {
pub registered_endpoints: usize,
pub messages_delivered: u64,
pub routes_configured: usize,
pub tenant_count: usize,
pub cross_tenant_rejections: u64,
}
#[derive(Debug, Clone, Default)]
pub struct TenantStats {
pub tenant_id: TenantId,
pub registered_endpoints: usize,
pub routes_configured: usize,
pub messages_delivered: u64,
}
pub struct K2KBuilder {
config: K2KConfig,
registry: Option<Arc<TenantRegistry>>,
}
impl K2KBuilder {
pub fn new() -> Self {
Self {
config: K2KConfig::default(),
registry: None,
}
}
pub fn max_pending_messages(mut self, count: usize) -> Self {
self.config.max_pending_messages = count;
self
}
pub fn delivery_timeout_ms(mut self, timeout: u64) -> Self {
self.config.delivery_timeout_ms = timeout;
self
}
pub fn enable_tracing(mut self, enable: bool) -> Self {
self.config.enable_tracing = enable;
self
}
pub fn max_hops(mut self, hops: u8) -> Self {
self.config.max_hops = hops;
self
}
pub fn with_registry(mut self, registry: Arc<TenantRegistry>) -> Self {
self.registry = Some(registry);
self
}
pub fn build(self) -> Arc<K2KBroker> {
match self.registry {
Some(registry) => K2KBroker::with_registry(self.config, registry),
None => K2KBroker::new(self.config),
}
}
}
impl Default for K2KBuilder {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct K2KMessageRegistration {
pub type_id: u64,
pub type_name: &'static str,
pub k2k_routable: bool,
pub category: Option<&'static str>,
}
inventory::collect!(K2KMessageRegistration);
pub struct K2KTypeRegistry {
by_type_id: HashMap<u64, &'static K2KMessageRegistration>,
by_type_name: HashMap<&'static str, &'static K2KMessageRegistration>,
by_category: HashMap<&'static str, Vec<u64>>,
}
impl K2KTypeRegistry {
pub fn discover() -> Self {
let mut registry = Self {
by_type_id: HashMap::new(),
by_type_name: HashMap::new(),
by_category: HashMap::new(),
};
for reg in inventory::iter::<K2KMessageRegistration>() {
registry.by_type_id.insert(reg.type_id, reg);
registry.by_type_name.insert(reg.type_name, reg);
if let Some(cat) = reg.category {
registry
.by_category
.entry(cat)
.or_default()
.push(reg.type_id);
}
}
registry
}
pub fn is_routable(&self, type_id: u64) -> bool {
self.by_type_id
.get(&type_id)
.map(|r| r.k2k_routable)
.unwrap_or(false)
}
pub fn get(&self, type_id: u64) -> Option<&'static K2KMessageRegistration> {
self.by_type_id.get(&type_id).copied()
}
pub fn get_by_name(&self, type_name: &str) -> Option<&'static K2KMessageRegistration> {
self.by_type_name.get(type_name).copied()
}
pub fn get_category(&self, category: &str) -> &[u64] {
self.by_category
.get(category)
.map(|v| v.as_slice())
.unwrap_or(&[])
}
pub fn categories(&self) -> impl Iterator<Item = &'static str> + '_ {
self.by_category.keys().copied()
}
pub fn iter(&self) -> impl Iterator<Item = &'static K2KMessageRegistration> + '_ {
self.by_type_id.values().copied()
}
pub fn routable_types(&self) -> Vec<u64> {
self.by_type_id
.iter()
.filter(|(_, r)| r.k2k_routable)
.map(|(id, _)| *id)
.collect()
}
pub fn len(&self) -> usize {
self.by_type_id.len()
}
pub fn is_empty(&self) -> bool {
self.by_type_id.is_empty()
}
}
impl Default for K2KTypeRegistry {
fn default() -> Self {
Self::discover()
}
}
impl std::fmt::Debug for K2KTypeRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("K2KTypeRegistry")
.field("registered_types", &self.by_type_id.len())
.field("categories", &self.by_category.keys().collect::<Vec<_>>())
.finish()
}
}
#[cfg(feature = "crypto")]
#[derive(Debug, Clone)]
pub struct K2KEncryptionConfig {
pub enabled: bool,
pub algorithm: K2KEncryptionAlgorithm,
pub forward_secrecy: bool,
pub key_rotation_interval_secs: u64,
pub require_encryption: bool,
}
#[cfg(feature = "crypto")]
impl Default for K2KEncryptionConfig {
fn default() -> Self {
Self {
enabled: true,
algorithm: K2KEncryptionAlgorithm::Aes256Gcm,
forward_secrecy: true,
key_rotation_interval_secs: 3600, require_encryption: false,
}
}
}
#[cfg(feature = "crypto")]
impl K2KEncryptionConfig {
pub fn disabled() -> Self {
Self {
enabled: false,
..Default::default()
}
}
pub fn strict() -> Self {
Self {
enabled: true,
require_encryption: true,
forward_secrecy: true,
..Default::default()
}
}
}
#[cfg(feature = "crypto")]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum K2KEncryptionAlgorithm {
Aes256Gcm,
ChaCha20Poly1305,
}
#[cfg(feature = "crypto")]
pub struct K2KKeyMaterial {
kernel_id: KernelId,
long_term_key: [u8; 32],
session_key: parking_lot::RwLock<[u8; 32]>,
session_generation: std::sync::atomic::AtomicU64,
created_at: std::time::Instant,
last_rotated: parking_lot::RwLock<std::time::Instant>,
}
#[cfg(feature = "crypto")]
impl K2KKeyMaterial {
pub fn new(kernel_id: KernelId) -> Self {
use rand::RngCore;
let mut rng = rand::thread_rng();
let mut long_term_key = [0u8; 32];
let mut session_key = [0u8; 32];
rng.fill_bytes(&mut long_term_key);
rng.fill_bytes(&mut session_key);
let now = std::time::Instant::now();
Self {
kernel_id,
long_term_key,
session_key: parking_lot::RwLock::new(session_key),
session_generation: std::sync::atomic::AtomicU64::new(1),
created_at: now,
last_rotated: parking_lot::RwLock::new(now),
}
}
pub fn from_key(kernel_id: KernelId, key: [u8; 32]) -> Self {
use rand::RngCore;
let mut rng = rand::thread_rng();
let mut session_key = [0u8; 32];
rng.fill_bytes(&mut session_key);
let now = std::time::Instant::now();
Self {
kernel_id,
long_term_key: key,
session_key: parking_lot::RwLock::new(session_key),
session_generation: std::sync::atomic::AtomicU64::new(1),
created_at: now,
last_rotated: parking_lot::RwLock::new(now),
}
}
pub fn kernel_id(&self) -> &KernelId {
&self.kernel_id
}
pub fn session_key(&self) -> [u8; 32] {
*self.session_key.read()
}
pub fn session_generation(&self) -> u64 {
self.session_generation
.load(std::sync::atomic::Ordering::Acquire)
}
pub fn rotate_session_key(&self) {
use rand::RngCore;
let mut rng = rand::thread_rng();
let mut new_key = [0u8; 32];
rng.fill_bytes(&mut new_key);
*self.session_key.write() = new_key;
self.session_generation
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
*self.last_rotated.write() = std::time::Instant::now();
}
pub fn derive_shared_secret(&self, dest_public_key: &[u8; 32]) -> [u8; 32] {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(&self.long_term_key);
hasher.update(dest_public_key);
hasher.update(b"k2k-shared-secret-v1");
let result = hasher.finalize();
let mut secret = [0u8; 32];
secret.copy_from_slice(&result);
secret
}
pub fn should_rotate(&self, interval_secs: u64) -> bool {
if interval_secs == 0 {
return false;
}
let elapsed = self.last_rotated.read().elapsed();
elapsed.as_secs() >= interval_secs
}
pub fn age(&self) -> std::time::Duration {
self.created_at.elapsed()
}
}
#[cfg(feature = "crypto")]
impl Drop for K2KKeyMaterial {
fn drop(&mut self) {
use zeroize::Zeroize;
self.long_term_key.zeroize();
self.session_key.write().zeroize();
}
}
#[cfg(feature = "crypto")]
#[derive(Debug, Clone)]
pub struct EncryptedK2KMessage {
pub id: MessageId,
pub source: KernelId,
pub destination: KernelId,
pub hops: u8,
pub sent_at: HlcTimestamp,
pub priority: u8,
pub key_generation: u64,
pub nonce: [u8; 12],
pub ciphertext: Vec<u8>,
pub tag: [u8; 16],
}
#[cfg(feature = "crypto")]
pub struct K2KEncryptor {
config: K2KEncryptionConfig,
key_material: K2KKeyMaterial,
peer_keys: parking_lot::RwLock<HashMap<KernelId, [u8; 32]>>,
stats: K2KEncryptionStats,
}
#[cfg(feature = "crypto")]
impl K2KEncryptor {
pub fn new(kernel_id: KernelId, config: K2KEncryptionConfig) -> Self {
Self {
config,
key_material: K2KKeyMaterial::new(kernel_id),
peer_keys: parking_lot::RwLock::new(HashMap::new()),
stats: K2KEncryptionStats::default(),
}
}
pub fn with_key(kernel_id: KernelId, key: [u8; 32], config: K2KEncryptionConfig) -> Self {
Self {
config,
key_material: K2KKeyMaterial::from_key(kernel_id, key),
peer_keys: parking_lot::RwLock::new(HashMap::new()),
stats: K2KEncryptionStats::default(),
}
}
pub fn public_key(&self) -> [u8; 32] {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(&self.key_material.long_term_key);
hasher.update(b"k2k-public-key-v1");
let result = hasher.finalize();
let mut public = [0u8; 32];
public.copy_from_slice(&result);
public
}
pub fn register_peer(&self, kernel_id: KernelId, public_key: [u8; 32]) {
self.peer_keys.write().insert(kernel_id, public_key);
}
pub fn unregister_peer(&self, kernel_id: &KernelId) {
self.peer_keys.write().remove(kernel_id);
}
pub fn maybe_rotate(&self) {
if self
.key_material
.should_rotate(self.config.key_rotation_interval_secs)
{
self.key_material.rotate_session_key();
self.stats
.key_rotations
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
pub fn encrypt(&self, message: &K2KMessage) -> Result<EncryptedK2KMessage> {
if !self.config.enabled {
return Err(RingKernelError::K2KError(
"K2K encryption is disabled".to_string(),
));
}
let peer_key = self
.peer_keys
.read()
.get(&message.destination)
.copied()
.ok_or_else(|| {
RingKernelError::K2KError(format!(
"No public key registered for destination kernel: {}",
message.destination
))
})?;
let shared_secret = self.key_material.derive_shared_secret(&peer_key);
let session_key = if self.config.forward_secrecy {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(&shared_secret);
hasher.update(&self.key_material.session_key());
let result = hasher.finalize();
let mut key = [0u8; 32];
key.copy_from_slice(&result);
key
} else {
shared_secret
};
use rand::RngCore;
let mut nonce = [0u8; 12];
rand::thread_rng().fill_bytes(&mut nonce);
let envelope_bytes = message.envelope.to_bytes();
let (ciphertext, tag) = match self.config.algorithm {
K2KEncryptionAlgorithm::Aes256Gcm => {
use aes_gcm::{
aead::{Aead, KeyInit},
Aes256Gcm, Nonce,
};
let cipher = Aes256Gcm::new_from_slice(&session_key)
.map_err(|e| RingKernelError::K2KError(format!("AES init failed: {}", e)))?;
let nonce_obj = Nonce::from_slice(&nonce);
let ciphertext = cipher
.encrypt(nonce_obj, envelope_bytes.as_slice())
.map_err(|e| RingKernelError::K2KError(format!("Encryption failed: {}", e)))?;
let tag_start = ciphertext.len() - 16;
let mut tag = [0u8; 16];
tag.copy_from_slice(&ciphertext[tag_start..]);
(ciphertext[..tag_start].to_vec(), tag)
}
K2KEncryptionAlgorithm::ChaCha20Poly1305 => {
use chacha20poly1305::{
aead::{Aead, KeyInit},
ChaCha20Poly1305, Nonce,
};
let cipher = ChaCha20Poly1305::new_from_slice(&session_key)
.map_err(|e| RingKernelError::K2KError(format!("ChaCha init failed: {}", e)))?;
let nonce_obj = Nonce::from_slice(&nonce);
let ciphertext = cipher
.encrypt(nonce_obj, envelope_bytes.as_slice())
.map_err(|e| RingKernelError::K2KError(format!("Encryption failed: {}", e)))?;
let tag_start = ciphertext.len() - 16;
let mut tag = [0u8; 16];
tag.copy_from_slice(&ciphertext[tag_start..]);
(ciphertext[..tag_start].to_vec(), tag)
}
};
self.stats
.messages_encrypted
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.stats.bytes_encrypted.fetch_add(
envelope_bytes.len() as u64,
std::sync::atomic::Ordering::Relaxed,
);
Ok(EncryptedK2KMessage {
id: message.id,
source: message.source.clone(),
destination: message.destination.clone(),
hops: message.hops,
sent_at: message.sent_at,
priority: message.priority,
key_generation: self.key_material.session_generation(),
nonce,
ciphertext,
tag,
})
}
pub fn decrypt(&self, encrypted: &EncryptedK2KMessage) -> Result<K2KMessage> {
if !self.config.enabled {
return Err(RingKernelError::K2KError(
"K2K encryption is disabled".to_string(),
));
}
let peer_key = self
.peer_keys
.read()
.get(&encrypted.source)
.copied()
.ok_or_else(|| {
RingKernelError::K2KError(format!(
"No public key registered for source kernel: {}",
encrypted.source
))
})?;
let shared_secret = self.key_material.derive_shared_secret(&peer_key);
let session_key = if self.config.forward_secrecy {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(&shared_secret);
hasher.update(&self.key_material.session_key());
let result = hasher.finalize();
let mut key = [0u8; 32];
key.copy_from_slice(&result);
key
} else {
shared_secret
};
let mut full_ciphertext = encrypted.ciphertext.clone();
full_ciphertext.extend_from_slice(&encrypted.tag);
let plaintext = match self.config.algorithm {
K2KEncryptionAlgorithm::Aes256Gcm => {
use aes_gcm::{
aead::{Aead, KeyInit},
Aes256Gcm, Nonce,
};
let cipher = Aes256Gcm::new_from_slice(&session_key)
.map_err(|e| RingKernelError::K2KError(format!("AES init failed: {}", e)))?;
let nonce = Nonce::from_slice(&encrypted.nonce);
cipher
.decrypt(nonce, full_ciphertext.as_slice())
.map_err(|e| {
self.stats
.decryption_failures
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
RingKernelError::K2KError(format!("Decryption failed: {}", e))
})?
}
K2KEncryptionAlgorithm::ChaCha20Poly1305 => {
use chacha20poly1305::{
aead::{Aead, KeyInit},
ChaCha20Poly1305, Nonce,
};
let cipher = ChaCha20Poly1305::new_from_slice(&session_key)
.map_err(|e| RingKernelError::K2KError(format!("ChaCha init failed: {}", e)))?;
let nonce = Nonce::from_slice(&encrypted.nonce);
cipher
.decrypt(nonce, full_ciphertext.as_slice())
.map_err(|e| {
self.stats
.decryption_failures
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
RingKernelError::K2KError(format!("Decryption failed: {}", e))
})?
}
};
let envelope = MessageEnvelope::from_bytes(&plaintext).map_err(|e| {
RingKernelError::K2KError(format!("Envelope deserialization failed: {}", e))
})?;
self.stats
.messages_decrypted
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.stats
.bytes_decrypted
.fetch_add(plaintext.len() as u64, std::sync::atomic::Ordering::Relaxed);
Ok(K2KMessage {
id: encrypted.id,
source: encrypted.source.clone(),
destination: encrypted.destination.clone(),
envelope,
hops: encrypted.hops,
sent_at: encrypted.sent_at,
priority: encrypted.priority,
})
}
pub fn stats(&self) -> K2KEncryptionStatsSnapshot {
K2KEncryptionStatsSnapshot {
messages_encrypted: self
.stats
.messages_encrypted
.load(std::sync::atomic::Ordering::Relaxed),
messages_decrypted: self
.stats
.messages_decrypted
.load(std::sync::atomic::Ordering::Relaxed),
bytes_encrypted: self
.stats
.bytes_encrypted
.load(std::sync::atomic::Ordering::Relaxed),
bytes_decrypted: self
.stats
.bytes_decrypted
.load(std::sync::atomic::Ordering::Relaxed),
key_rotations: self
.stats
.key_rotations
.load(std::sync::atomic::Ordering::Relaxed),
decryption_failures: self
.stats
.decryption_failures
.load(std::sync::atomic::Ordering::Relaxed),
peer_count: self.peer_keys.read().len(),
session_generation: self.key_material.session_generation(),
}
}
pub fn config(&self) -> &K2KEncryptionConfig {
&self.config
}
}
#[cfg(feature = "crypto")]
#[derive(Default)]
struct K2KEncryptionStats {
messages_encrypted: std::sync::atomic::AtomicU64,
messages_decrypted: std::sync::atomic::AtomicU64,
bytes_encrypted: std::sync::atomic::AtomicU64,
bytes_decrypted: std::sync::atomic::AtomicU64,
key_rotations: std::sync::atomic::AtomicU64,
decryption_failures: std::sync::atomic::AtomicU64,
}
#[cfg(feature = "crypto")]
#[derive(Debug, Clone, Default)]
pub struct K2KEncryptionStatsSnapshot {
pub messages_encrypted: u64,
pub messages_decrypted: u64,
pub bytes_encrypted: u64,
pub bytes_decrypted: u64,
pub key_rotations: u64,
pub decryption_failures: u64,
pub peer_count: usize,
pub session_generation: u64,
}
#[cfg(feature = "crypto")]
pub struct EncryptedK2KEndpoint {
inner: K2KEndpoint,
encryptor: Arc<K2KEncryptor>,
}
#[cfg(feature = "crypto")]
impl EncryptedK2KEndpoint {
pub fn new(inner: K2KEndpoint, encryptor: Arc<K2KEncryptor>) -> Self {
Self { inner, encryptor }
}
pub fn public_key(&self) -> [u8; 32] {
self.encryptor.public_key()
}
pub fn register_peer(&self, kernel_id: KernelId, public_key: [u8; 32]) {
self.encryptor.register_peer(kernel_id, public_key);
}
pub async fn send_encrypted(
&self,
destination: KernelId,
envelope: MessageEnvelope,
) -> Result<DeliveryReceipt> {
self.encryptor.maybe_rotate();
let timestamp = envelope.header.timestamp;
let message = K2KMessage::new(
self.inner.kernel_id.clone(),
destination.clone(),
envelope,
timestamp,
);
let _encrypted = self.encryptor.encrypt(&message)?;
self.inner.send(destination, message.envelope).await
}
pub async fn receive_decrypted(&mut self) -> Option<K2KMessage> {
self.inner.receive().await
}
pub fn encryption_stats(&self) -> K2KEncryptionStatsSnapshot {
self.encryptor.stats()
}
}
#[cfg(feature = "crypto")]
pub struct EncryptedK2KBuilder {
k2k_config: K2KConfig,
encryption_config: K2KEncryptionConfig,
}
#[cfg(feature = "crypto")]
impl EncryptedK2KBuilder {
pub fn new() -> Self {
Self {
k2k_config: K2KConfig::default(),
encryption_config: K2KEncryptionConfig::default(),
}
}
pub fn k2k_config(mut self, config: K2KConfig) -> Self {
self.k2k_config = config;
self
}
pub fn encryption_config(mut self, config: K2KEncryptionConfig) -> Self {
self.encryption_config = config;
self
}
pub fn with_forward_secrecy(mut self, enabled: bool) -> Self {
self.encryption_config.forward_secrecy = enabled;
self
}
pub fn with_algorithm(mut self, algorithm: K2KEncryptionAlgorithm) -> Self {
self.encryption_config.algorithm = algorithm;
self
}
pub fn with_key_rotation(mut self, interval_secs: u64) -> Self {
self.encryption_config.key_rotation_interval_secs = interval_secs;
self
}
pub fn require_encryption(mut self, required: bool) -> Self {
self.encryption_config.require_encryption = required;
self
}
pub fn build(self) -> (Arc<K2KBroker>, K2KEncryptionConfig) {
(K2KBroker::new(self.k2k_config), self.encryption_config)
}
}
#[cfg(feature = "crypto")]
impl Default for EncryptedK2KBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_k2k_broker_registration() {
let broker = K2KBuilder::new().build();
let kernel1 = KernelId::new("kernel1");
let kernel2 = KernelId::new("kernel2");
let _endpoint1 = broker.register(kernel1.clone());
let _endpoint2 = broker.register(kernel2.clone());
assert!(broker.is_registered(&kernel1));
assert!(broker.is_registered(&kernel2));
assert_eq!(broker.registered_kernels().len(), 2);
}
#[tokio::test]
async fn test_k2k_message_delivery() {
let broker = K2KBuilder::new().build();
let kernel1 = KernelId::new("kernel1");
let kernel2 = KernelId::new("kernel2");
let endpoint1 = broker.register(kernel1.clone());
let mut endpoint2 = broker.register(kernel2.clone());
let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
let receipt = endpoint1.send(kernel2.clone(), envelope).await.unwrap();
assert_eq!(receipt.status, DeliveryStatus::Delivered);
let message = endpoint2.try_receive();
assert!(message.is_some());
assert_eq!(message.unwrap().source, kernel1);
}
#[test]
fn test_k2k_config_default() {
let config = K2KConfig::default();
assert_eq!(config.max_pending_messages, 1024);
assert_eq!(config.delivery_timeout_ms, 5000);
}
mod multi_tenant {
use super::*;
use crate::audit::MemorySink;
fn env() -> MessageEnvelope {
MessageEnvelope::empty(1, 2, HlcTimestamp::now(1))
}
#[tokio::test]
async fn legacy_single_tenant_send_unchanged() {
let broker = K2KBuilder::new().build();
let k1 = KernelId::new("k1");
let k2 = KernelId::new("k2");
let e1 = broker.register(k1.clone());
let mut e2 = broker.register(k2.clone());
let receipt = e1.send(k2.clone(), env()).await.unwrap();
assert_eq!(receipt.status, DeliveryStatus::Delivered);
let msg = e2.try_receive().unwrap();
assert_eq!(msg.source, k1);
assert_eq!(msg.envelope.tenant_id, UNSPECIFIED_TENANT);
}
#[tokio::test]
async fn single_tenant_fast_path_uses_unspecified_tenant() {
let broker = K2KBuilder::new().build();
let k = KernelId::new("k");
let _e = broker.register(k.clone());
assert_eq!(broker.tenant_of(&k), Some(UNSPECIFIED_TENANT));
assert_eq!(broker.tenant_count(), 1);
}
#[tokio::test]
async fn cross_tenant_send_rejected_with_tenant_mismatch() {
let broker = K2KBuilder::new().build();
broker
.registry()
.register(1, TenantQuota::default())
.unwrap();
broker
.registry()
.register(2, TenantQuota::default())
.unwrap();
let ka = KernelId::new("tenant1_kernel");
let kb = KernelId::new("tenant2_kernel");
let ea = broker.register_tenant(1, AuditTag::new(10, 100), ka.clone());
let _eb = broker.register_tenant(2, AuditTag::new(20, 200), kb.clone());
let err = ea.send(kb.clone(), env()).await.unwrap_err();
match err {
RingKernelError::TenantMismatch { from, to } => {
assert_eq!(from, 1);
assert_eq!(to, 2);
}
other => panic!("expected TenantMismatch, got {:?}", other),
}
assert_eq!(broker.stats().cross_tenant_rejections, 1);
}
#[tokio::test]
async fn cross_tenant_attempt_recorded_in_audit_sink() {
let sink = Arc::new(MemorySink::new(100));
let registry = Arc::new(TenantRegistry::with_audit_sink(sink.clone()));
registry.register(1, TenantQuota::default()).unwrap();
registry.register(2, TenantQuota::default()).unwrap();
let broker = K2KBuilder::new().with_registry(registry).build();
let ka = KernelId::new("ka");
let kb = KernelId::new("kb");
let ea = broker.register_tenant(1, AuditTag::new(10, 100), ka.clone());
let _eb = broker.register_tenant(2, AuditTag::new(20, 200), kb.clone());
let _ = ea.send(kb.clone(), env()).await.unwrap_err();
let events = sink.events();
assert_eq!(events.len(), 1);
assert!(events[0].description.contains("cross-tenant"));
let md: std::collections::HashMap<_, _> = events[0]
.metadata
.iter()
.cloned()
.collect::<std::collections::HashMap<_, _>>();
assert_eq!(md.get("from_tenant"), Some(&"1".to_string()));
assert_eq!(md.get("to_tenant"), Some(&"2".to_string()));
}
#[tokio::test]
async fn same_tenant_send_succeeds_with_audit_tag() {
let broker = K2KBuilder::new().build();
broker
.registry()
.register(1, TenantQuota::default())
.unwrap();
let ka = KernelId::new("a");
let kb = KernelId::new("b");
let ea = broker.register_tenant(1, AuditTag::new(10, 100), ka.clone());
let mut eb = broker.register_tenant(1, AuditTag::new(10, 100), kb.clone());
let receipt = ea.send(kb.clone(), env()).await.unwrap();
assert_eq!(receipt.status, DeliveryStatus::Delivered);
let msg = eb.try_receive().unwrap();
assert_eq!(msg.envelope.tenant_id, 1);
assert_eq!(msg.envelope.audit_tag, AuditTag::new(10, 100));
}
#[tokio::test]
async fn engagement_cost_accumulates_across_sends() {
let broker = K2KBuilder::new().build();
broker
.registry()
.register(1, TenantQuota::unlimited())
.unwrap();
let ka = KernelId::new("a");
let kb = KernelId::new("b");
let ea = broker.register_tenant(1, AuditTag::new(10, 100), ka.clone());
let _eb = broker.register_tenant(1, AuditTag::new(10, 100), kb.clone());
for _ in 0..4 {
let _ = ea.send(kb.clone(), env()).await.unwrap();
broker.registry().track_usage(
1,
AuditTag::new(10, 100),
std::time::Duration::from_millis(50),
);
}
let cost = broker
.registry()
.get_engagement_cost_for(1, AuditTag::new(10, 100));
assert_eq!(cost, std::time::Duration::from_millis(200));
}
#[tokio::test]
async fn engagement_cost_separate_across_audit_tags() {
let broker = K2KBuilder::new().build();
broker
.registry()
.register(1, TenantQuota::unlimited())
.unwrap();
let tag_a = AuditTag::new(10, 1);
let tag_b = AuditTag::new(10, 2);
broker
.registry()
.track_usage(1, tag_a, std::time::Duration::from_millis(150));
broker
.registry()
.track_usage(1, tag_b, std::time::Duration::from_millis(300));
assert_eq!(
broker.registry().get_engagement_cost_for(1, tag_a),
std::time::Duration::from_millis(150)
);
assert_eq!(
broker.registry().get_engagement_cost_for(1, tag_b),
std::time::Duration::from_millis(300)
);
}
#[tokio::test]
async fn quota_enforcement_rejects_over_rate_limit() {
let broker = K2KBuilder::new().build();
let mut quota = TenantQuota::default();
quota.max_messages_per_sec = 2;
broker.registry().register(1, quota).unwrap();
let ka = KernelId::new("a");
let kb = KernelId::new("b");
let ea = broker.register_tenant(1, AuditTag::new(10, 1), ka.clone());
let _eb = broker.register_tenant(1, AuditTag::new(10, 1), kb.clone());
assert!(ea.send(kb.clone(), env()).await.is_ok());
assert!(ea.send(kb.clone(), env()).await.is_ok());
let err = ea.send(kb.clone(), env()).await.unwrap_err();
assert!(matches!(err, RingKernelError::LoadSheddingRejected { .. }));
}
#[tokio::test]
async fn register_tenant_kernel_and_unregister() {
let broker = K2KBuilder::new().build();
broker
.registry()
.register(7, TenantQuota::default())
.unwrap();
let k = KernelId::new("k");
let _ep = broker.register_tenant(7, AuditTag::new(1, 1), k.clone());
assert_eq!(broker.tenant_of(&k), Some(7));
assert_eq!(broker.registered_kernels_for(7), vec![k.clone()]);
broker.unregister(&k);
assert!(!broker.is_registered(&k));
assert!(broker.registered_kernels_for(7).is_empty());
}
#[tokio::test]
async fn tenant_stats_reports_per_tenant_counts() {
let broker = K2KBuilder::new().build();
broker
.registry()
.register(1, TenantQuota::default())
.unwrap();
broker
.registry()
.register(2, TenantQuota::default())
.unwrap();
let _ea = broker.register_tenant(1, AuditTag::unspecified(), KernelId::new("a"));
let _eb = broker.register_tenant(1, AuditTag::unspecified(), KernelId::new("b"));
let _ec = broker.register_tenant(2, AuditTag::unspecified(), KernelId::new("c"));
let s1 = broker.tenant_stats(1).unwrap();
let s2 = broker.tenant_stats(2).unwrap();
assert_eq!(s1.registered_endpoints, 2);
assert_eq!(s2.registered_endpoints, 1);
}
#[tokio::test]
async fn re_registering_kernel_moves_it_between_tenants() {
let broker = K2KBuilder::new().build();
broker
.registry()
.register(1, TenantQuota::default())
.unwrap();
broker
.registry()
.register(2, TenantQuota::default())
.unwrap();
let k = KernelId::new("roaming");
let _e1 = broker.register_tenant(1, AuditTag::unspecified(), k.clone());
assert_eq!(broker.tenant_of(&k), Some(1));
let _e2 = broker.register_tenant(2, AuditTag::unspecified(), k.clone());
assert_eq!(broker.tenant_of(&k), Some(2));
assert!(broker.registered_kernels_for(1).is_empty());
assert_eq!(broker.registered_kernels_for(2), vec![k]);
}
#[tokio::test]
async fn send_with_audit_overrides_registration_tag() {
let broker = K2KBuilder::new().build();
broker
.registry()
.register(1, TenantQuota::unlimited())
.unwrap();
let ka = KernelId::new("a");
let kb = KernelId::new("b");
let _ea = broker.register_tenant(1, AuditTag::new(10, 1), ka.clone());
let mut eb = broker.register_tenant(1, AuditTag::new(10, 1), kb.clone());
let receipt = broker
.send_with_audit(ka.clone(), kb.clone(), env(), AuditTag::new(10, 99))
.await
.unwrap();
assert_eq!(receipt.status, DeliveryStatus::Delivered);
let msg = eb.try_receive().unwrap();
assert_eq!(msg.envelope.audit_tag, AuditTag::new(10, 99));
}
#[tokio::test]
async fn default_audit_tag_behavior() {
let broker = K2KBuilder::new().build();
let ka = KernelId::new("a");
let kb = KernelId::new("b");
let _ea = broker.register(ka.clone());
let mut eb = broker.register(kb.clone());
let _ = broker.send(ka.clone(), kb.clone(), env()).await.unwrap();
let msg = eb.try_receive().unwrap();
assert_eq!(msg.envelope.tenant_id, UNSPECIFIED_TENANT);
assert!(msg.envelope.audit_tag.is_unspecified());
}
}
#[cfg(feature = "crypto")]
mod crypto_tests {
use super::*;
#[test]
fn test_k2k_encryption_config_default() {
let config = K2KEncryptionConfig::default();
assert!(config.enabled);
assert!(config.forward_secrecy);
assert_eq!(config.algorithm, K2KEncryptionAlgorithm::Aes256Gcm);
assert_eq!(config.key_rotation_interval_secs, 3600);
}
#[test]
fn test_k2k_encryption_config_disabled() {
let config = K2KEncryptionConfig::disabled();
assert!(!config.enabled);
}
#[test]
fn test_k2k_encryption_config_strict() {
let config = K2KEncryptionConfig::strict();
assert!(config.enabled);
assert!(config.require_encryption);
assert!(config.forward_secrecy);
}
#[test]
fn test_k2k_key_material_creation() {
let kernel_id = KernelId::new("test_kernel");
let key_material = K2KKeyMaterial::new(kernel_id.clone());
assert_eq!(key_material.kernel_id(), &kernel_id);
assert_eq!(key_material.session_generation(), 1);
}
#[test]
fn test_k2k_key_material_rotation() {
let kernel_id = KernelId::new("test_kernel");
let key_material = K2KKeyMaterial::new(kernel_id);
let old_session_key = key_material.session_key();
let old_generation = key_material.session_generation();
key_material.rotate_session_key();
let new_session_key = key_material.session_key();
let new_generation = key_material.session_generation();
assert_ne!(old_session_key, new_session_key);
assert_eq!(new_generation, old_generation + 1);
}
#[test]
fn test_k2k_key_material_shared_secret() {
let kernel1 = K2KKeyMaterial::new(KernelId::new("kernel1"));
let kernel2 = K2KKeyMaterial::new(KernelId::new("kernel2"));
let pk1 = {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(&kernel1.long_term_key);
hasher.update(b"k2k-public-key-v1");
let result = hasher.finalize();
let mut public = [0u8; 32];
public.copy_from_slice(&result);
public
};
let pk2 = {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(&kernel2.long_term_key);
hasher.update(b"k2k-public-key-v1");
let result = hasher.finalize();
let mut public = [0u8; 32];
public.copy_from_slice(&result);
public
};
let secret1 = kernel1.derive_shared_secret(&pk2);
let secret2 = kernel2.derive_shared_secret(&pk1);
assert_eq!(secret1.len(), 32);
assert_eq!(secret2.len(), 32);
}
#[test]
fn test_k2k_encryptor_creation() {
let kernel_id = KernelId::new("test_kernel");
let config = K2KEncryptionConfig::default();
let encryptor = K2KEncryptor::new(kernel_id.clone(), config);
let public_key = encryptor.public_key();
assert_eq!(public_key.len(), 32);
let stats = encryptor.stats();
assert_eq!(stats.messages_encrypted, 0);
assert_eq!(stats.messages_decrypted, 0);
assert_eq!(stats.peer_count, 0);
}
#[test]
fn test_k2k_encryptor_peer_registration() {
let kernel_id = KernelId::new("test_kernel");
let config = K2KEncryptionConfig::default();
let encryptor = K2KEncryptor::new(kernel_id, config);
let peer_id = KernelId::new("peer_kernel");
let peer_key = [42u8; 32];
encryptor.register_peer(peer_id.clone(), peer_key);
assert_eq!(encryptor.stats().peer_count, 1);
encryptor.unregister_peer(&peer_id);
assert_eq!(encryptor.stats().peer_count, 0);
}
#[test]
fn test_k2k_encrypted_builder() {
let (broker, config) = EncryptedK2KBuilder::new()
.with_forward_secrecy(true)
.with_algorithm(K2KEncryptionAlgorithm::ChaCha20Poly1305)
.with_key_rotation(1800)
.require_encryption(true)
.build();
assert!(config.forward_secrecy);
assert_eq!(config.algorithm, K2KEncryptionAlgorithm::ChaCha20Poly1305);
assert_eq!(config.key_rotation_interval_secs, 1800);
assert!(config.require_encryption);
let stats = broker.stats();
assert_eq!(stats.registered_endpoints, 0);
}
#[test]
fn test_k2k_encryption_stats_snapshot() {
let stats = K2KEncryptionStatsSnapshot::default();
assert_eq!(stats.messages_encrypted, 0);
assert_eq!(stats.messages_decrypted, 0);
assert_eq!(stats.bytes_encrypted, 0);
assert_eq!(stats.bytes_decrypted, 0);
assert_eq!(stats.key_rotations, 0);
assert_eq!(stats.decryption_failures, 0);
assert_eq!(stats.peer_count, 0);
assert_eq!(stats.session_generation, 0);
}
#[test]
fn test_k2k_encryption_algorithms() {
assert_ne!(
K2KEncryptionAlgorithm::Aes256Gcm,
K2KEncryptionAlgorithm::ChaCha20Poly1305
);
}
#[test]
fn test_k2k_key_material_should_rotate() {
let kernel_id = KernelId::new("test_kernel");
let key_material = K2KKeyMaterial::new(kernel_id);
assert!(!key_material.should_rotate(0));
assert!(!key_material.should_rotate(3600));
}
#[test]
fn test_k2k_encryptor_disabled_encryption() {
let kernel_id = KernelId::new("test_kernel");
let config = K2KEncryptionConfig::disabled();
let encryptor = K2KEncryptor::new(kernel_id.clone(), config);
let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
let message = K2KMessage::new(
kernel_id,
KernelId::new("dest"),
envelope,
HlcTimestamp::now(1),
);
let result = encryptor.encrypt(&message);
assert!(result.is_err());
}
#[test]
fn test_k2k_encryptor_missing_peer_key() {
let kernel_id = KernelId::new("test_kernel");
let config = K2KEncryptionConfig::default();
let encryptor = K2KEncryptor::new(kernel_id.clone(), config);
let envelope = MessageEnvelope::empty(1, 2, HlcTimestamp::now(1));
let message = K2KMessage::new(
kernel_id,
KernelId::new("unknown_dest"),
envelope,
HlcTimestamp::now(1),
);
let result = encryptor.encrypt(&message);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("No public key"));
}
}
}