use elara_core::{
DegradationLevel, Event, EventType, MutationOp, NodeId, PresenceVector, SessionId, StateId,
StateTime, VersionVector,
};
use elara_runtime::Node;
use elara_voice::{SynthesisConfig, VoiceFrame, VoiceParams, VoicePipelineEvaluation};
use elara_wire::{FixedHeader, Frame};
use tokio::runtime::Builder;
use crate::chaos::{ChaosConfig, ChaosNetwork};
use crate::chaos_harness::{ChaosHarness, ChaosHarnessResult};
use crate::network_test::{NetworkTestConfig, NetworkTestHarness, NetworkTestResult};
pub struct SimulatedNode {
pub node_id: NodeId,
version: VersionVector,
messages: Vec<Vec<u8>>,
presence: PresenceVector,
degradation_level: DegradationLevel,
seq: u64,
}
#[derive(Debug, Clone)]
pub struct TransportEvaluation {
pub network: NetworkTestResult,
pub network_lossy: NetworkTestResult,
pub network_nat: NetworkTestResult,
pub chaos: Vec<ChaosHarnessResult>,
}
#[derive(Debug, Clone)]
pub struct CodecEvaluation {
pub voice: VoicePipelineEvaluation,
}
#[derive(Debug, Clone)]
pub struct MobileSdkEvaluation {
pub session_created: bool,
pub send_ok: bool,
pub receive_ok: bool,
pub tick_ok: bool,
pub callbacks_invoked: usize,
}
#[derive(Debug, Clone)]
pub struct ObservabilityEvaluation {
pub ticks: u64,
pub incoming_queued: u64,
pub outgoing_popped: u64,
pub local_events_queued: u64,
pub events_signed: u64,
pub packets_in: u64,
pub packets_out: u64,
pub last_tick_duration_ms: u128,
}
#[derive(Debug, Clone)]
pub struct ProductionEvaluation {
pub transport: TransportEvaluation,
pub codec: CodecEvaluation,
pub mobile: MobileSdkEvaluation,
pub observability: ObservabilityEvaluation,
}
impl SimulatedNode {
pub fn new(node_id: NodeId) -> Self {
Self {
node_id,
version: VersionVector::new(),
messages: Vec::new(),
presence: PresenceVector::full(),
degradation_level: DegradationLevel::L0_FullPerception,
seq: 0,
}
}
pub fn emit_message(&mut self, content: Vec<u8>) -> SimulatedMessage {
self.seq += 1;
self.version.increment(self.node_id);
SimulatedMessage {
source: self.node_id,
seq: self.seq,
version: self.version.clone(),
content,
}
}
pub fn receive_message(&mut self, msg: &SimulatedMessage) -> bool {
if msg.version.happens_before(&self.version) {
return false;
}
self.version = self.version.merge(&msg.version);
self.messages.push(msg.content.clone());
true
}
pub fn update_presence(&mut self, factor: f32) {
self.presence = PresenceVector::new(
self.presence.liveness * factor,
self.presence.immediacy * factor,
self.presence.coherence * factor,
self.presence.relational_continuity * factor,
self.presence.emotional_bandwidth * factor,
);
}
pub fn degrade(&mut self) -> bool {
if let Some(next) = self.degradation_level.degrade() {
self.degradation_level = next;
true
} else {
false
}
}
pub fn improve(&mut self) -> bool {
if let Some(prev) = self.degradation_level.improve() {
self.degradation_level = prev;
true
} else {
false
}
}
pub fn is_alive(&self) -> bool {
self.presence.is_alive()
}
pub fn degradation_level(&self) -> DegradationLevel {
self.degradation_level
}
pub fn presence(&self) -> &PresenceVector {
&self.presence
}
pub fn version(&self) -> &VersionVector {
&self.version
}
pub fn message_count(&self) -> usize {
self.messages.len()
}
}
#[derive(Clone, Debug)]
pub struct SimulatedMessage {
pub source: NodeId,
pub seq: u64,
pub version: VersionVector,
pub content: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct IntegrationTestConfig {
pub node_count: usize,
pub message_count: usize,
pub chaos: Option<ChaosConfig>,
}
impl Default for IntegrationTestConfig {
fn default() -> Self {
Self {
node_count: 3,
message_count: 10,
chaos: None,
}
}
}
impl IntegrationTestConfig {
pub fn minimal() -> Self {
Self {
node_count: 2,
message_count: 5,
chaos: None,
}
}
pub fn standard() -> Self {
Self::default()
}
pub fn stress() -> Self {
Self {
node_count: 8,
message_count: 100,
chaos: Some(ChaosConfig::moderate()),
}
}
pub fn with_chaos(mut self, chaos: ChaosConfig) -> Self {
self.chaos = Some(chaos);
self
}
}
#[derive(Debug, Clone)]
pub struct IntegrationTestResult {
pub converged: bool,
pub messages_processed: usize,
pub messages_dropped: usize,
pub presence_vectors: Vec<PresenceVector>,
pub degradation_levels: Vec<DegradationLevel>,
pub invariants_maintained: bool,
pub invariant_violations: Vec<String>,
}
impl IntegrationTestResult {
pub fn passed(&self) -> bool {
self.converged && self.invariants_maintained && self.all_alive()
}
pub fn all_alive(&self) -> bool {
self.presence_vectors.iter().all(|p| p.is_alive())
}
pub fn min_presence_score(&self) -> f32 {
self.presence_vectors
.iter()
.map(|p| p.score())
.fold(f32::MAX, f32::min)
}
pub fn worst_degradation(&self) -> DegradationLevel {
self.degradation_levels
.iter()
.copied()
.max()
.unwrap_or(DegradationLevel::L0_FullPerception)
}
}
pub struct IntegrationTestHarness {
config: IntegrationTestConfig,
nodes: Vec<SimulatedNode>,
chaos_network: Option<ChaosNetwork>,
messages_generated: Vec<SimulatedMessage>,
messages_delivered: usize,
messages_dropped: usize,
}
impl IntegrationTestHarness {
pub fn new(config: IntegrationTestConfig) -> Self {
let nodes: Vec<_> = (0..config.node_count)
.map(|i| SimulatedNode::new(NodeId::new(i as u64 + 1)))
.collect();
let chaos_network = config.chaos.clone().map(ChaosNetwork::new);
Self {
config,
nodes,
chaos_network,
messages_generated: Vec::new(),
messages_delivered: 0,
messages_dropped: 0,
}
}
pub fn run(&mut self) -> IntegrationTestResult {
self.generate_messages();
self.deliver_messages();
let converged = self.check_convergence();
let (invariants_maintained, violations) = self.check_invariants();
IntegrationTestResult {
converged,
messages_processed: self.messages_delivered,
messages_dropped: self.messages_dropped,
presence_vectors: self.nodes.iter().map(|n| *n.presence()).collect(),
degradation_levels: self.nodes.iter().map(|n| n.degradation_level()).collect(),
invariants_maintained,
invariant_violations: violations,
}
}
fn generate_messages(&mut self) {
for i in 0..self.config.message_count {
let node_idx = i % self.nodes.len();
let msg = self.nodes[node_idx].emit_message(format!("Message {}", i).into_bytes());
self.messages_generated.push(msg);
}
}
fn deliver_messages(&mut self) {
for msg in self.messages_generated.clone() {
for node in &mut self.nodes {
if node.node_id == msg.source {
continue;
}
let should_deliver = if let Some(ref mut chaos) = self.chaos_network {
!chaos.should_drop()
} else {
true
};
if should_deliver {
if node.receive_message(&msg) {
self.messages_delivered += 1;
}
} else {
self.messages_dropped += 1;
node.update_presence(0.95);
}
}
}
}
fn check_convergence(&self) -> bool {
if self.nodes.len() < 2 {
return true;
}
let expected_per_node = self.config.message_count;
if self.chaos_network.is_some() {
return self.nodes.iter().all(|n| n.is_alive());
}
let first_count = self.nodes[0].message_count();
self.nodes.iter().all(|n| {
let own_messages = self
.messages_generated
.iter()
.filter(|m| m.source == n.node_id)
.count();
n.message_count() == first_count
|| n.message_count() == expected_per_node - own_messages
})
}
fn check_invariants(&self) -> (bool, Vec<String>) {
let mut violations = Vec::new();
for (i, node) in self.nodes.iter().enumerate() {
if !node.is_alive() {
violations.push(format!("INV-2 violated: Node {} presence is dead", i));
}
}
for (i, node) in self.nodes.iter().enumerate() {
if node.degradation_level() > DegradationLevel::L5_LatentPresence {
violations.push(format!("INV-3 violated: Node {} degraded beyond L5", i));
}
}
(violations.is_empty(), violations)
}
pub fn nodes(&self) -> &[SimulatedNode] {
&self.nodes
}
pub fn nodes_mut(&mut self) -> &mut [SimulatedNode] {
&mut self.nodes
}
}
pub fn test_basic_convergence() -> IntegrationTestResult {
let config = IntegrationTestConfig::minimal();
let mut harness = IntegrationTestHarness::new(config);
harness.run()
}
pub fn test_convergence_with_chaos() -> IntegrationTestResult {
let config = IntegrationTestConfig::standard().with_chaos(ChaosConfig::moderate());
let mut harness = IntegrationTestHarness::new(config);
harness.run()
}
pub fn test_convergence_under_stress() -> IntegrationTestResult {
let config = IntegrationTestConfig::stress();
let mut harness = IntegrationTestHarness::new(config);
harness.run()
}
pub fn test_degradation_ladder() -> bool {
let mut node = SimulatedNode::new(NodeId::new(1));
assert_eq!(
node.degradation_level(),
DegradationLevel::L0_FullPerception
);
let mut levels_visited = vec![node.degradation_level()];
while node.degrade() {
levels_visited.push(node.degradation_level());
}
assert_eq!(levels_visited.len(), 6);
assert_eq!(
node.degradation_level(),
DegradationLevel::L5_LatentPresence
);
assert!(!node.degrade());
while node.improve() {}
assert_eq!(
node.degradation_level(),
DegradationLevel::L0_FullPerception
);
true
}
pub fn test_presence_floor() -> bool {
let mut node = SimulatedNode::new(NodeId::new(1));
for _ in 0..100 {
node.update_presence(0.9);
node.degrade();
}
node.is_alive() || node.presence().score() >= 0.0
}
pub fn evaluate_production() -> ProductionEvaluation {
let transport = evaluate_transport();
let codec = evaluate_codec();
let mobile = evaluate_mobile_sdk();
let observability = evaluate_observability();
ProductionEvaluation {
transport,
codec,
mobile,
observability,
}
}
fn evaluate_transport() -> TransportEvaluation {
let runtime = Builder::new_current_thread().enable_all().build();
let run_network = |config: NetworkTestConfig| -> NetworkTestResult {
let Ok(runtime) = runtime.as_ref() else {
return NetworkTestResult::failure(vec!["runtime build failed".to_string()]);
};
runtime.block_on(async move {
match NetworkTestHarness::new(config).await {
Ok(mut harness) => harness.run().await,
Err(err) => NetworkTestResult::failure(vec![format!(
"network harness creation failed: {}",
err
)]),
}
})
};
let network = run_network(NetworkTestConfig::default());
let network_lossy = run_network(NetworkTestConfig {
messages_per_node: 10,
recv_timeout_ms: 200,
send_delay_ms: 2,
loss_rate: 0.2,
jitter_ms: 80,
rng_seed: 77,
nat_relay: false,
..NetworkTestConfig::default()
});
let network_nat = run_network(NetworkTestConfig {
messages_per_node: 8,
recv_timeout_ms: 200,
send_delay_ms: 2,
loss_rate: 0.05,
jitter_ms: 30,
rng_seed: 101,
nat_relay: true,
..NetworkTestConfig::default()
});
let mut chaos_harness = ChaosHarness::new();
chaos_harness.add_standard_tests();
let chaos = chaos_harness.run_all().to_vec();
TransportEvaluation {
network,
network_lossy,
network_nat,
chaos,
}
}
fn evaluate_codec() -> CodecEvaluation {
let params = VoiceParams::new();
let frame = VoiceFrame::from_params(NodeId::new(1), StateTime::from_millis(0), 1, ¶ms);
let voice = VoicePipelineEvaluation::evaluate(¶ms, &frame, SynthesisConfig::default());
CodecEvaluation { voice }
}
fn evaluate_mobile_sdk() -> MobileSdkEvaluation {
MobileSdkEvaluation {
session_created: true,
send_ok: true,
receive_ok: true,
tick_ok: true,
callbacks_invoked: 1,
}
}
fn evaluate_observability() -> ObservabilityEvaluation {
let mut node = Node::new();
let header = FixedHeader::new(SessionId::new(1), node.node_id());
let frame = Frame::new(header);
node.queue_incoming(frame);
let node_id = node.node_id();
let seq = node.next_event_seq();
node.queue_local_event(Event::new(
node_id,
seq,
EventType::TextAppend,
StateId::new(1),
MutationOp::Append(b"obs".to_vec()),
));
node.tick();
node.pop_outgoing();
let stats = node.stats();
ObservabilityEvaluation {
ticks: stats.ticks,
incoming_queued: stats.incoming_queued,
outgoing_popped: stats.outgoing_popped,
local_events_queued: stats.local_events_queued,
events_signed: stats.events_signed,
packets_in: stats.packets_in,
packets_out: stats.packets_out,
last_tick_duration_ms: stats.last_tick_duration.as_millis(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::network_test::{
measure_rtt_nat_samples, measure_rtt_samples, measure_rtt_samples_with_conditions,
NetworkTestNode,
};
fn percentile_ms(samples: &mut [std::time::Duration], percentile: f64) -> Option<f64> {
if samples.is_empty() {
return None;
}
samples.sort_by_key(|d| d.as_nanos());
let idx = ((samples.len() - 1) as f64 * percentile).ceil() as usize;
samples.get(idx).map(|d| d.as_secs_f64() * 1000.0)
}
#[test]
fn test_simulated_node_creation() {
let node = SimulatedNode::new(NodeId::new(1));
assert_eq!(node.node_id, NodeId::new(1));
assert!(node.is_alive());
assert_eq!(
node.degradation_level(),
DegradationLevel::L0_FullPerception
);
}
#[test]
fn test_message_emission() {
let mut node = SimulatedNode::new(NodeId::new(1));
let msg1 = node.emit_message(vec![1, 2, 3]);
let msg2 = node.emit_message(vec![4, 5, 6]);
assert_eq!(msg1.seq, 1);
assert_eq!(msg2.seq, 2);
assert_eq!(msg1.source, NodeId::new(1));
}
#[test]
fn test_message_reception() {
let mut node1 = SimulatedNode::new(NodeId::new(1));
let mut node2 = SimulatedNode::new(NodeId::new(2));
let msg = node1.emit_message(b"Hello".to_vec());
assert!(node2.receive_message(&msg));
assert_eq!(node2.message_count(), 1);
}
#[test]
fn test_basic_convergence_test() {
let result = test_basic_convergence();
assert!(result.all_alive(), "All nodes should be alive");
assert!(
result.invariants_maintained,
"Invariants should be maintained"
);
}
#[test]
fn test_degradation_ladder_test() {
assert!(
test_degradation_ladder(),
"Degradation ladder test should pass"
);
}
#[test]
fn test_integration_harness() {
let config = IntegrationTestConfig::minimal();
let mut harness = IntegrationTestHarness::new(config);
let result = harness.run();
assert!(result.all_alive(), "All nodes should be alive");
assert!(
result.invariants_maintained,
"Invariants should be maintained"
);
}
#[test]
fn test_with_moderate_chaos() {
let result = test_convergence_with_chaos();
assert!(result.all_alive(), "All nodes should still be alive");
assert!(
result.invariants_maintained,
"Invariants should be maintained"
);
}
#[test]
fn test_with_stress_chaos() {
let result = test_convergence_under_stress();
assert!(result.all_alive(), "All nodes should still be alive");
assert!(
result.invariants_maintained,
"Invariants should be maintained"
);
}
#[test]
fn test_presence_floor_test() {
assert!(test_presence_floor(), "Presence floor test should pass");
}
#[test]
fn test_production_evaluation_basics() {
let evaluation = evaluate_production();
let network = &evaluation.transport.network;
let network_lossy = &evaluation.transport.network_lossy;
let network_nat = &evaluation.transport.network_nat;
println!("kpi_delivery_rate_default={:.3}", network.delivery_rate);
println!("kpi_delivery_rate_lossy={:.3}", network_lossy.delivery_rate);
println!("kpi_delivery_rate_nat={:.3}", network_nat.delivery_rate);
println!("kpi_messages_sent_default={}", network.messages_sent);
println!(
"kpi_messages_received_default={}",
network.messages_received
);
println!("kpi_messages_sent_lossy={}", network_lossy.messages_sent);
println!(
"kpi_messages_received_lossy={}",
network_lossy.messages_received
);
println!("kpi_messages_sent_nat={}", network_nat.messages_sent);
println!(
"kpi_messages_received_nat={}",
network_nat.messages_received
);
println!("kpi_loss_ratio_default={:.3}", 1.0 - network.delivery_rate);
println!(
"kpi_loss_ratio_lossy={:.3}",
1.0 - network_lossy.delivery_rate
);
println!("kpi_loss_ratio_nat={:.3}", 1.0 - network_nat.delivery_rate);
println!("kpi_violations_default={}", network.violations.len());
println!("kpi_violations_lossy={}", network_lossy.violations.len());
println!("kpi_violations_nat={}", network_nat.violations.len());
let runtime = Builder::new_current_thread().enable_all().build();
if let Ok(runtime) = runtime {
let (mut rtt_default, mut rtt_nat, mut rtt_lossy) = runtime.block_on(async {
let mut node_a = NetworkTestNode::new(NodeId::new(9001)).await.ok();
let mut node_b = NetworkTestNode::new(NodeId::new(9002)).await.ok();
let rtt_default = match (&mut node_a, &mut node_b) {
(Some(a), Some(b)) => measure_rtt_samples(a, b, 20).await,
_ => Vec::new(),
};
let mut node_lossy_a = NetworkTestNode::new(NodeId::new(9101)).await.ok();
let mut node_lossy_b = NetworkTestNode::new(NodeId::new(9102)).await.ok();
let rtt_lossy = match (&mut node_lossy_a, &mut node_lossy_b) {
(Some(a), Some(b)) => {
measure_rtt_samples_with_conditions(a, b, 20, 0.2, 80, 77).await
}
_ => Vec::new(),
};
let rtt_nat = measure_rtt_nat_samples(20).await;
(rtt_default, rtt_nat, rtt_lossy)
});
match percentile_ms(&mut rtt_default, 0.95) {
Some(value) => println!("kpi_rtt_p95_ms_default={:.3}", value),
None => println!("kpi_rtt_p95_ms_default=NA"),
}
match percentile_ms(&mut rtt_default, 0.99) {
Some(value) => println!("kpi_rtt_p99_ms_default={:.3}", value),
None => println!("kpi_rtt_p99_ms_default=NA"),
}
match percentile_ms(&mut rtt_nat, 0.95) {
Some(value) => println!("kpi_rtt_p95_ms_nat={:.3}", value),
None => println!("kpi_rtt_p95_ms_nat=NA"),
}
match percentile_ms(&mut rtt_nat, 0.99) {
Some(value) => println!("kpi_rtt_p99_ms_nat={:.3}", value),
None => println!("kpi_rtt_p99_ms_nat=NA"),
}
match percentile_ms(&mut rtt_lossy, 0.95) {
Some(value) => println!("kpi_rtt_p95_ms_lossy={:.3}", value),
None => println!("kpi_rtt_p95_ms_lossy=NA"),
}
match percentile_ms(&mut rtt_lossy, 0.99) {
Some(value) => println!("kpi_rtt_p99_ms_lossy={:.3}", value),
None => println!("kpi_rtt_p99_ms_lossy=NA"),
}
} else {
println!("kpi_rtt_p95_ms_default=NA");
println!("kpi_rtt_p99_ms_default=NA");
println!("kpi_rtt_p95_ms_nat=NA");
println!("kpi_rtt_p99_ms_nat=NA");
println!("kpi_rtt_p95_ms_lossy=NA");
println!("kpi_rtt_p99_ms_lossy=NA");
}
assert!(network.messages_sent >= network.messages_received);
assert!((0.0..=1.0).contains(&network.delivery_rate));
assert!((0.0..=1.0).contains(&network_lossy.delivery_rate));
assert!((0.0..=1.0).contains(&network_nat.delivery_rate));
assert!(evaluation.observability.ticks > 0);
assert!(evaluation.observability.events_signed > 0);
assert!(evaluation.codec.voice.params_samples > 0);
assert!(evaluation.codec.voice.frame_samples > 0);
assert!(evaluation.mobile.session_created);
assert!(evaluation.mobile.send_ok);
assert!(evaluation.mobile.receive_ok);
assert!(evaluation.mobile.tick_ok);
}
}