use trash_parallelism::channels::*;
#[test]
pub fn test_bounded_queue_3() {
let (tx, rx) = core::bounded_queue_3::<String>(5);
drop(tx);
drop(rx);
}
#[test]
pub fn test_create_bounded_channel() {
let (tx, rx) = core::create_bounded_channel::<String>(5);
drop(tx);
drop(rx);
}
#[test]
pub fn test_create_unbounded_channel() {
let (tx, rx) = core::create_unbounded_channel::<String>();
drop(tx);
drop(rx);
}
#[test]
pub fn test_message_new_and_verify() {
let msg = core::Message::new("test data".to_string());
assert!(msg.verify());
assert_eq!(msg.payload, "test data");
assert!(msg.id.starts_with("msg_"));
assert!(msg.checksum.is_some());
}
#[test]
pub fn test_send_async_and_recv_async() {
smol::block_on(async {
let (tx, rx) = core::bounded_queue_3::<String>(1);
core::send_async(&tx, "test message".to_string())
.await
.unwrap();
let received = core::recv_async(&rx).await.unwrap();
assert_eq!(received, "test message");
});
}
#[test]
pub fn test_send_json_message_and_recv_json_message() {
smol::block_on(async {
let (tx, rx) = core::bounded_queue_3::<core::JsonMessage>(1);
core::send_json_message(&tx, "test data").await.unwrap();
let received: String = core::recv_json_message(&rx).await.unwrap();
assert_eq!(received, "test data");
});
}
#[test]
pub fn test_broadcast_message() {
smol::block_on(async {
let (tx1, rx1) = core::bounded_queue_3::<String>(1);
let (tx2, rx2) = core::bounded_queue_3::<String>(1);
let senders = vec![tx1, tx2];
core::broadcast_message("broadcast".to_string(), senders)
.await
.unwrap();
let msg1 = core::recv_async(&rx1).await.unwrap();
let msg2 = core::recv_async(&rx2).await.unwrap();
assert_eq!(msg1, "broadcast");
assert_eq!(msg2, "broadcast");
});
}
#[test]
pub fn test_benchmark_channel() {
smol::block_on(async {
let (tx, rx) = core::bounded_queue_3::<String>(100);
let stats = core::benchmark_channel(&tx, &rx, "test".to_string(), 10).await;
assert_eq!(stats.messages_sent, 10);
assert_eq!(stats.messages_received, 10);
assert!(stats.avg_latency.is_some());
});
}
#[test]
pub fn test_monitored_channel() {
let channel = monitoring::create_monitored_channel::<String>(10);
drop(channel);
}
#[test]
pub fn test_monitored_channel_send_recv() {
smol::block_on(async {
let channel = monitoring::MonitoredChannel::new();
channel.send_async("test".to_string()).await.unwrap();
let received = channel.recv_async().await.unwrap();
assert_eq!(received, "test");
let stats = channel.stats();
assert_eq!(stats.messages_sent, 1);
assert_eq!(stats.messages_received, 1);
});
}
#[test]
pub fn test_monitored_channel_builder() {
let channel = monitoring::MonitoredChannel::builder().capacity(50).build();
smol::block_on(async {
channel.send_async("test".to_string()).await.unwrap();
let stats = channel.stats();
assert_eq!(stats.messages_sent, 1);
});
}
#[test]
pub fn test_channel_stats_to_json() {
let stats = monitoring::ChannelStats {
messages_sent: 5,
messages_received: 3,
..Default::default()
};
let json = stats.to_json().unwrap();
assert!(json.contains("messages_sent"));
assert!(json.contains('5'));
assert!(json.contains("messages_received"));
assert!(json.contains('3'));
}
#[test]
pub fn test_channel_stats() {
let mut stats = monitoring::ChannelStats::default();
assert_eq!(stats.messages_sent, 0);
stats.messages_sent = 10;
stats.reset();
assert_eq!(stats.messages_sent, 0);
}
#[test]
pub fn test_channel_multiplexer() {
let multiplexer = multiplexor::ChannelMultiplexer::new();
drop(multiplexer);
}
#[test]
pub fn test_channel_multiplexer_route() {
smol::block_on(async {
let multiplexer = multiplexor::ChannelMultiplexer::new();
let (tx, rx) = core::bounded_queue_3::<String>(1);
multiplexer.register_route("test", tx);
multiplexer
.route_message("test", "message".to_string())
.await
.unwrap();
let received = core::recv_async(&rx).await.unwrap();
assert_eq!(received, "message");
});
}
#[test]
pub fn test_async_channel_processor() {
smol::block_on(async {
let (tx, rx) = core::bounded_queue_3::<String>(1);
let processor = multiplexor::AsyncChannelProcessor::new(rx, |_msg: String| {
Box::pin(async move {
Ok(())
})
});
processor.start();
tx.send("test".to_string()).await.unwrap();
smol::Timer::after(std::time::Duration::from_millis(10)).await;
});
}
#[test]
pub fn test_parallel_channel_processor() {
smol::block_on(async {
let (tx1, rx1) = core::bounded_queue_3::<i32>(5);
let (tx2, rx2) = core::bounded_queue_3::<i32>(5);
let receivers = vec![rx1, rx2];
let processor = specialist::ParallelChannelProcessor::new(receivers, |x| x * 2);
processor.start();
tx1.send(1).await.unwrap();
tx2.send(2).await.unwrap();
tx1.send(3).await.unwrap();
smol::Timer::after(std::time::Duration::from_millis(50)).await;
});
}
#[test]
pub fn test_parallel_channel_processor_multiple_receivers() {
smol::block_on(async {
let mut receivers = Vec::new();
let mut senders = Vec::new();
for _ in 0..3 {
let (tx, rx) = core::bounded_queue_3::<String>(5);
receivers.push(rx);
senders.push(tx);
}
let processor =
specialist::ParallelChannelProcessor::new(receivers, |s: String| s.to_uppercase());
processor.start();
for (i, sender) in senders.iter().enumerate() {
sender.send(format!("msg{i}")).await.unwrap();
}
smol::Timer::after(std::time::Duration::from_millis(50)).await;
});
}
#[test]
pub fn test_parallel_channel_processor_creation() {
let (tx, rx) = core::bounded_queue_3::<f64>(5);
let receivers = vec![rx];
let processor = specialist::ParallelChannelProcessor::new(receivers, |x| x + 1.0);
drop(processor);
drop(tx);
}
#[test]
pub fn test_persistent_channel() {
smol::block_on(async {
let (tx, _rx) = core::bounded_queue_3::<String>(5);
let temp_file = tempfile::NamedTempFile::new().unwrap();
let log_path = temp_file.path().to_str().unwrap();
let channel = specialist::PersistentChannel::new(tx, log_path).unwrap();
channel
.send_persistent("test message".to_string())
.await
.unwrap();
smol::Timer::after(std::time::Duration::from_millis(100)).await;
});
}
#[test]
pub fn test_persistent_channel_send_persistent() {
smol::block_on(async {
let (tx, rx) = core::bounded_queue_3::<String>(5);
let temp_file = tempfile::NamedTempFile::new().unwrap();
let log_path = temp_file.path().to_str().unwrap();
let channel = specialist::PersistentChannel::new(tx, log_path).unwrap();
channel
.send_persistent("persistent data".to_string())
.await
.unwrap();
smol::Timer::after(std::time::Duration::from_millis(100)).await;
let received = core::recv_async(&rx).await.unwrap();
assert_eq!(received, "persistent data");
});
}
#[test]
pub fn test_persistent_channel_recover_messages() {
smol::block_on(async {
let (tx, _rx) = core::bounded_queue_3::<String>(5);
let temp_file = tempfile::NamedTempFile::new().unwrap();
let log_path = temp_file.path().to_str().unwrap();
{
let channel = specialist::PersistentChannel::new(tx, log_path).unwrap();
channel.send_persistent("msg1".to_string()).await.unwrap();
channel.send_persistent("msg2".to_string()).await.unwrap();
smol::Timer::after(std::time::Duration::from_millis(100)).await;
}
let file_content = std::fs::read_to_string(log_path).unwrap();
let lines: Vec<&str> = file_content.lines().collect();
let mut recovered = Vec::new();
for line in lines {
if !line.trim().is_empty() {
let data: String = serde_json::from_str(line).unwrap();
recovered.push(data);
}
}
assert_eq!(recovered, vec!["msg1".to_string(), "msg2".to_string()]);
});
}
#[test]
pub fn test_persistent_channel_file_operations() {
smol::block_on(async {
let temp_file = tempfile::NamedTempFile::new().unwrap();
let log_path = temp_file.path().to_str().unwrap();
let (tx, _rx) = core::bounded_queue_3::<i32>(5);
let channel = specialist::PersistentChannel::new(tx, log_path).unwrap();
for i in 0..3 {
channel.send_persistent(i).await.unwrap();
}
smol::Timer::after(std::time::Duration::from_millis(100)).await;
let file_content = std::fs::read_to_string(log_path).unwrap();
let lines: Vec<&str> = file_content.lines().collect();
let mut recovered = Vec::new();
for line in lines {
if !line.trim().is_empty() {
let data: i32 = serde_json::from_str(line).unwrap();
recovered.push(data);
}
}
assert_eq!(recovered, vec![0, 1, 2]);
});
}
#[test]
pub fn test_work_queue() {
let queue = queue::WorkQueue::<String, String>::new(2);
drop(queue);
}
#[test]
pub fn test_work_queue_submit() {
smol::block_on(async {
let queue = queue::WorkQueue::<String, ()>::new(1);
queue.submit("task".to_string()).await.unwrap();
});
}
#[test]
pub fn test_work_queue_multiple_workers() {
smol::block_on(async {
let queue = queue::WorkQueue::<String, ()>::new(3);
for i in 0..5 {
queue.submit(format!("task{i}")).await.unwrap();
}
});
}
#[test]
pub fn test_work_queue_round_robin() {
smol::block_on(async {
let queue = queue::WorkQueue::<i32, ()>::new(2);
for i in 0..4 {
queue.submit(i).await.unwrap();
}
});
}
#[test]
pub fn test_work_queue_collect() {
smol::block_on(async {
let queue = queue::WorkQueue::<String, String>::new(1);
let result = queue.collect().await;
assert!(result.is_err()); });
}
#[test]
pub fn test_work_queue_zero_workers() {
let queue = queue::WorkQueue::<String, String>::new(0);
smol::block_on(async {
let result = queue.submit("task".to_string()).await;
assert!(result.is_err()); });
}
#[test]
pub fn test_work_queue_large_number_workers() {
let queue = queue::WorkQueue::<i32, String>::new(10);
smol::block_on(async {
for i in 0..20 {
queue.submit(i).await.unwrap();
}
});
}
#[test]
pub fn test_base64_channel() {
let (tx, _) = core::bounded_queue_3::<String>(1);
let channel = specialist::Base64Channel::new(tx);
drop(channel);
}
#[test]
pub fn test_base64_channel_send_recv() {
smol::block_on(async {
let (tx, rx) = core::bounded_queue_3::<String>(1);
let channel = specialist::Base64Channel::new(tx);
channel.send_base64(&"test data".to_string()).await.unwrap();
let received: String = specialist::Base64Channel::recv_base64(&rx).await.unwrap();
assert_eq!(received, "test data");
});
}
#[test]
pub fn test_compressed_channel() {
let channel = specialist::CompressedChannel::new();
drop(channel);
}
#[test]
pub fn test_compressed_channel_send_recv() {
smol::block_on(async {
let channel = specialist::CompressedChannel::new();
let data = "test compression data".to_string();
channel.send_compressed(&data).await.unwrap();
let received: String = channel.recv_decompressed().await.unwrap();
assert_eq!(received, data);
});
}
#[test]
pub fn test_compressed_channel_with_config() {
let channel = specialist::CompressedChannel::with_config(50, 9);
drop(channel);
}
#[test]
pub fn test_compressed_channel_builder() {
let channel = specialist::CompressedChannel::builder()
.capacity(200)
.compression_level(11)
.build();
drop(channel);
}
#[test]
pub fn test_file_backed_channel() {
}
#[test]
pub fn test_file_backed_channel_send() {
smol::block_on(async {
let channel: specialist::FileBackedChannel<String> =
specialist::FileBackedChannel::new().unwrap();
channel.send("test data".to_string()).await.unwrap();
let flushed = channel.flush_to_memory().unwrap();
assert_eq!(flushed.len(), 0);
});
}
#[test]
pub fn test_file_backed_channel_overflow() {
smol::block_on(async {
let channel: specialist::FileBackedChannel<String> =
specialist::FileBackedChannel::new().unwrap();
for i in 0..110 {
channel.send(format!("message{i}")).await.unwrap();
}
smol::Timer::after(std::time::Duration::from_millis(50)).await;
let flushed = channel.flush_to_memory().unwrap();
assert!(flushed.is_empty());
});
}
#[test]
pub fn test_file_backed_channel_flush_to_memory() {
smol::block_on(async {
let channel: specialist::FileBackedChannel<String> =
specialist::FileBackedChannel::new().unwrap();
for i in 0..10 {
channel.send(format!("data{i}")).await.unwrap();
}
smol::Timer::after(std::time::Duration::from_millis(10)).await;
let flushed: Vec<String> = channel.flush_to_memory().unwrap();
assert!(flushed.is_empty());
});
}
#[test]
pub fn test_file_backed_channel_multiple_sends() {
smol::block_on(async {
let channel: specialist::FileBackedChannel<i32> =
specialist::FileBackedChannel::new().unwrap();
for i in 0..10 {
channel.send(i).await.unwrap();
}
let flushed = channel.flush_to_memory().unwrap();
assert_eq!(flushed.len(), 0);
});
}
#[test]
pub fn test_rate_limited_channel() {
let channel: specialist::RateLimitedChannel<String> =
specialist::RateLimitedChannel::new(10, 10.0, 1.0);
drop(channel);
}
#[test]
pub fn test_rate_limited_channel_send() {
smol::block_on(async {
let (rx_tx, rx) = core::bounded_queue_3::<String>(10);
let channel: specialist::RateLimitedChannel<String> =
specialist::RateLimitedChannel::new(1, 10.0, 1.0);
drop(channel);
drop(rx_tx);
drop(rx);
});
}
#[test]
pub fn test_rate_limited_channel_within_limit() {
smol::block_on(async {
let channel: specialist::RateLimitedChannel<String> =
specialist::RateLimitedChannel::new(10, 5.0, 10.0); for i in 0..5 {
channel.send(format!("msg{i}")).await.unwrap();
}
});
}
#[test]
pub fn test_rate_limited_channel_exceed_limit() {
smol::block_on(async {
let channel: specialist::RateLimitedChannel<String> =
specialist::RateLimitedChannel::new(10, 2.0, 0.0); channel.send("msg1".to_string()).await.unwrap();
channel.send("msg2".to_string()).await.unwrap();
let result = channel.send("msg3".to_string()).await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().to_string(), "Rate limit exceeded");
});
}
#[test]
pub fn test_rate_limited_channel_refill() {
smol::block_on(async {
let channel: specialist::RateLimitedChannel<String> =
specialist::RateLimitedChannel::new(10, 1.0, 100.0); channel.send("msg1".to_string()).await.unwrap();
let result = channel.send("msg2".to_string()).await;
assert!(result.is_err());
smol::Timer::after(std::time::Duration::from_millis(20)).await;
channel.send("msg3".to_string()).await.unwrap();
});
}
#[test]
pub fn test_priority_channel() {
let channel: specialist::PriorityChannel<String> = specialist::PriorityChannel::new(10);
drop(channel);
}
#[test]
pub fn test_priority_channel_send_recv() {
smol::block_on(async {
let channel: specialist::PriorityChannel<String> = specialist::PriorityChannel::new(10);
channel.send_normal("normal".to_string()).await.unwrap();
channel.send_high("high".to_string()).await.unwrap();
channel.send_low("low".to_string()).await.unwrap();
let received = channel.recv().await.unwrap();
assert_eq!(received, "high");
let received = channel.recv().await.unwrap();
assert_eq!(received, "normal");
let received = channel.recv().await.unwrap();
assert_eq!(received, "low");
});
}
#[test]
pub fn test_fast_message_parser() {
let parser = parsers::FastMessageParser::new('\n');
let buffer = b"line1\nline2\nline3";
let messages = parser.parse_messages(buffer);
assert_eq!(messages.len(), 3);
assert_eq!(messages[0], b"line1");
assert_eq!(messages[1], b"line2");
assert_eq!(messages[2], b"line3");
}
#[test]
pub fn test_fast_message_parser_json() {
let parser = parsers::FastMessageParser::new('\n');
let buffer = b"{\"name\":\"Alice\"}\n{\"name\":\"Bob\"}";
let messages = parser.parse_json_messages(buffer).unwrap();
assert_eq!(messages.len(), 2);
assert_eq!(messages[0]["name"], "Alice");
assert_eq!(messages[1]["name"], "Bob");
}
#[test]
pub fn test_channel_aggregator() {
smol::block_on(async {
let (tx1, rx1) = core::bounded_queue_3::<String>(1);
let (tx_out, rx_out) = core::bounded_queue_3::<String>(2);
let aggregator = parsers::ChannelAggregator::new(vec![rx1], tx_out);
aggregator.start();
tx1.send("message".to_string()).await.unwrap();
let received = core::recv_async(&rx_out).await.unwrap();
assert_eq!(received, "message");
});
}
#[test]
pub fn test_batching_channel() {
let channel: parsers::BatchingChannel<String> = parsers::BatchingChannel::new(3, 10);
drop(channel);
}
#[test]
pub fn test_batching_channel_send() {
smol::block_on(async {
let channel: parsers::BatchingChannel<String> = parsers::BatchingChannel::new(2, 10);
let batch_receiver = channel.batch_receiver();
channel.send("item1".to_string()).await.unwrap();
channel.send("item2".to_string()).await.unwrap();
let batch = core::recv_async(&batch_receiver).await.unwrap();
assert_eq!(batch.len(), 2);
assert_eq!(batch[0], "item1");
assert_eq!(batch[1], "item2");
});
}
#[test]
pub fn test_filtered_channel() {
let (tx, _) = core::bounded_queue_3::<i32>(10);
let filtered = parsers::FilteredChannel::new(tx, |&num| num > 0);
drop(filtered);
}
#[test]
pub fn test_filtered_channel_send() {
smol::block_on(async {
let (tx, rx) = core::bounded_queue_3::<i32>(10);
let filtered = parsers::FilteredChannel::new(tx, |&num| num > 0);
filtered.send_filtered(5).await.unwrap(); filtered.send_filtered(-1).await.unwrap(); filtered.send_filtered(10).await.unwrap();
let positive1 = core::recv_async(&rx).await.unwrap();
assert_eq!(positive1, 5);
let positive2 = core::recv_async(&rx).await.unwrap();
assert_eq!(positive2, 10);
});
}
#[test]
pub fn test_create_async_processor() {
let (tx, rx) = core::bounded_queue_3::<i32>(1);
let processor = multiplexor::create_async_processor(rx, |_num: i32| {
Box::pin(async move {
Ok(())
})
});
drop(processor);
drop(tx);
}
#[test]
pub fn test_persistent_channel_recover_messages_file_not_found() {
let result: Result<Vec<String>, Box<dyn std::error::Error>> =
specialist::PersistentChannel::<String>::recover_messages("non_existent_file.log");
assert!(result.is_err());
}
#[test]
pub fn test_parallel_channel_processor_empty_receivers() {
let receivers: Vec<smol::channel::Receiver<i32>> = vec![];
let processor = specialist::ParallelChannelProcessor::new(receivers, |x| x * 2);
processor.start();
}
#[test]
pub fn test_rate_limited_channel_zero_tokens() {
smol::block_on(async {
let channel: specialist::RateLimitedChannel<String> =
specialist::RateLimitedChannel::new(10, 0.0, 1.0); let result = channel.send("msg".to_string()).await;
assert!(result.is_err());
assert_eq!(result.unwrap_err().to_string(), "Rate limit exceeded");
});
}
#[test]
pub fn test_priority_channel_capacity() {
smol::block_on(async {
let channel: specialist::PriorityChannel<String> = specialist::PriorityChannel::new(1);
channel.send_high("high1".to_string()).await.unwrap();
channel.send_normal("normal1".to_string()).await.unwrap();
channel.send_low("low1".to_string()).await.unwrap();
let received = channel.recv().await.unwrap();
assert_eq!(received, "high1");
});
}
#[test]
pub fn test_compressed_channel_large_data() {
smol::block_on(async {
let channel = specialist::CompressedChannel::new();
let large_data = "A".repeat(10000); channel.send_compressed(&large_data).await.unwrap();
let received: String = channel.recv_decompressed().await.unwrap();
assert_eq!(received, large_data);
});
}