use synapse::{Config, SynapseRouter, SimpleMessage, MessageType};
use std::collections::HashMap;
use tokio::sync::Barrier;
use futures::future::join_all;
use anyhow::Result;
use std::sync::Arc;
#[cfg(test)]
mod concurrent_registry_tests {
use super::*;
#[tokio::test]
async fn test_concurrent_router_creation() -> Result<()> {
let num_tasks = 10;
let barrier = Arc::new(Barrier::new(num_tasks));
let mut tasks = Vec::new();
for i in 0..num_tasks {
let barrier_clone = Arc::clone(&barrier);
let entity_id = format!("entity_{}", i);
let task = tokio::spawn(async move {
barrier_clone.wait().await;
let config = Config::for_testing();
SynapseRouter::new(config, entity_id).await
});
tasks.push(task);
}
let results = join_all(tasks).await;
let mut success_count = 0;
for task_result in results {
match task_result {
Ok(Ok(_router)) => success_count += 1,
Ok(Err(e)) => println!("Router creation failed: {}", e),
Err(e) => println!("Task failed: {:?}", e),
}
}
assert!(success_count >= num_tasks * 8 / 10, "At least 80% of concurrent router creations should succeed");
println!("Successfully created {} out of {} routers concurrently", success_count, num_tasks);
Ok(())
}
#[tokio::test]
async fn test_concurrent_message_processing() -> Result<()> {
let config = Config::for_testing();
let router = Arc::new(SynapseRouter::new(config, "test_entity".to_string()).await?);
let num_messages = 20;
let barrier = Arc::new(Barrier::new(num_messages));
let mut tasks = Vec::new();
for i in 0..num_messages {
let router_clone = Arc::clone(&router);
let barrier_clone = Arc::clone(&barrier);
let task = tokio::spawn(async move {
barrier_clone.wait().await;
let message = SimpleMessage {
to: format!("Target_{}", i),
from_entity: "test_entity".to_string(),
content: format!("Concurrent message {}", i),
message_type: MessageType::Direct,
metadata: HashMap::new(),
};
router_clone.convert_to_secure_message(&message).await
});
tasks.push(task);
}
let results = join_all(tasks).await;
let mut success_count = 0;
for task_result in results {
match task_result {
Ok(Ok(_secure_msg)) => success_count += 1,
Ok(Err(e)) => println!("Message processing failed: {}", e),
Err(e) => println!("Task failed: {:?}", e),
}
}
assert!(success_count >= num_messages * 8 / 10, "At least 80% of concurrent message processing should succeed");
println!("Successfully processed {} out of {} messages concurrently", success_count, num_messages);
Ok(())
}
#[tokio::test]
async fn test_concurrent_config_creation() -> Result<()> {
let num_configs = 15;
let barrier = Arc::new(Barrier::new(num_configs));
let mut tasks = Vec::new();
for i in 0..num_configs {
let barrier_clone = Arc::clone(&barrier);
let entity_id = format!("config_entity_{}", i);
let task = tokio::spawn(async move {
barrier_clone.wait().await;
Config::default_for_entity(entity_id, "test".to_string())
});
tasks.push(task);
}
let results = join_all(tasks).await;
let mut success_count = 0;
for task_result in results {
match task_result {
Ok(_config) => success_count += 1,
Err(e) => println!("Task failed: {:?}", e),
}
}
assert_eq!(success_count, num_configs, "All concurrent config creations should succeed");
println!("Successfully created {} configs concurrently", success_count);
Ok(())
}
#[tokio::test]
async fn test_mixed_concurrent_operations() -> Result<()> {
let num_operations = 12;
let barrier = Arc::new(Barrier::new(num_operations));
let mut tasks = Vec::new();
for i in 0..(num_operations / 2) {
let barrier_clone = Arc::clone(&barrier);
let entity_id = format!("mixed_entity_{}", i);
let task = tokio::spawn(async move {
barrier_clone.wait().await;
let config = Config::for_testing();
let router = SynapseRouter::new(config, entity_id).await?;
let message = SimpleMessage {
to: "MixedTarget".to_string(),
from_entity: "mixed_entity".to_string(),
content: "Mixed operation message".to_string(),
message_type: MessageType::Direct,
metadata: HashMap::new(),
};
router.convert_to_secure_message(&message).await?;
Ok::<(), anyhow::Error>(())
});
tasks.push(task);
}
for i in (num_operations / 2)..num_operations {
let barrier_clone = Arc::clone(&barrier);
let task = tokio::spawn(async move {
barrier_clone.wait().await;
let _config = Config::default_for_entity(
format!("mixed_config_{}", i),
"test".to_string()
);
Ok::<(), anyhow::Error>(())
});
tasks.push(task);
}
let results = join_all(tasks).await;
let mut success_count = 0;
for task_result in results {
match task_result {
Ok(Ok(())) => success_count += 1,
Ok(Err(e)) => println!("Mixed operation failed: {}", e),
Err(e) => println!("Task failed: {:?}", e),
}
}
assert!(success_count >= num_operations * 7 / 10, "At least 70% of mixed concurrent operations should succeed");
println!("Successfully completed {} out of {} mixed operations concurrently", success_count, num_operations);
Ok(())
}
}