use super::*;
use std::collections::HashSet;
use std::sync::atomic::AtomicBool;
use std::sync::Barrier;
use std::thread;
#[derive(Clone, Debug, PartialEq)]
struct TestStruct {
test: u32,
test2: u64,
}
#[test]
fn basic_topic_pub_sub_test() {
let test_topic = create_topic::<TestStruct>("test_struct_basic_poll".to_string(), 16).unwrap();
let publisher_struct = test_topic.create_publisher();
publisher_struct.publish(TestStruct { test: 1, test2: 2 });
assert_eq!(test_topic.generation.load(Ordering::SeqCst), 1);
let publisher = test_topic.create_publisher();
publisher.publish(TestStruct {
test: 42,
test2: 43,
});
assert_eq!(test_topic.generation.load(Ordering::SeqCst), 2);
let mut sub = test_topic.create_subscriber();
assert!(sub.check_update());
let data = sub.check_update_and_copy();
assert!(data.is_some());
let data = data.unwrap();
assert_eq!(data.test, 1);
assert_eq!(data.test2, 2);
assert!(sub.check_update());
let data = sub.check_update_and_copy();
assert!(data.is_some());
let data = data.unwrap();
assert_eq!(data.test, 42);
assert_eq!(data.test2, 43);
}
#[test]
fn topic_test() {
let topic: Topic<u16> = Topic::new("test".to_string(), 16, 1);
assert_eq!(topic.name(), "test");
assert_eq!(topic.generation.load(Ordering::SeqCst), 0);
assert_eq!(topic.token(), Token(1));
}
#[test]
fn topic_poller_add_topic_test() {
let mut poller = TopicPoller::new();
let topic: Topic<u32> = Topic::new("test_poll".to_string(), 16, 2);
let result = poller.add_topic(&topic);
assert!(result.is_ok());
}
#[test]
fn topic_poller_wait_test() {
let mut poller = TopicPoller::new();
let reg_topic: Topic<u32> = Topic::new("test_wait".to_string(), 16, 3);
poller.add_topic(®_topic).unwrap();
let result = poller.wait(Some(Duration::from_millis(10)));
assert!(result.is_ok());
}
#[test]
fn topic_poller_notification_test() {
let mut poller = TopicPoller::new();
let topic = Arc::new(Topic::new("test_notify".to_string(), 16, 4));
poller.add_topic(&topic).unwrap();
let topic_clone = topic.clone();
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
let publisher = topic_clone.create_publisher();
publisher.publish(42u32);
});
let result = poller.wait(Some(Duration::from_millis(200)));
assert!(result.is_ok());
for i in poller.iter() {
assert_eq!(i, topic.token());
}
handle.join().unwrap();
}
#[test]
fn topic_notify_and_clear_test() {
let topic: Topic<u32> = Topic::new("test_clear".to_string(), 16, 5);
topic.notify();
topic.clear_event();
topic.notify();
topic.clear_event();
}
#[test]
fn topic_poller_iter_test() {
let mut poller = TopicPoller::new();
let topic1: Topic<u32> = Topic::new("test_iter1".to_string(), 16, 6);
let topic2: Topic<u32> = Topic::new("test_iter2".to_string(), 16, 7);
poller.add_topic(&topic1).unwrap();
poller.add_topic(&topic2).unwrap();
poller.wait(Some(Duration::from_millis(10))).unwrap();
let tokens: Vec<Token> = poller.iter().collect();
assert!(tokens.is_empty());
}
#[test]
fn lagging_subscriber_should_not_emit_more_items_than_queue_can_retain() {
let topic = Arc::new(Topic::new("test_lagging_subscriber".to_string(), 2, 8));
let publisher = topic.create_publisher();
let mut subscriber = topic.create_subscriber();
for value in 0_u32..4 {
publisher.publish(value);
}
let mut received = Vec::new();
while subscriber.check_update() {
received.push(subscriber.check_update_and_copy().unwrap());
}
assert_eq!(
received,
vec![2, 3],
"a subscriber that falls behind the ring buffer should only observe the retained tail"
);
}
#[test]
fn concurrent_publishers_should_not_drop_or_duplicate_messages() {
const PRODUCERS: usize = 8;
const MESSAGES_PER_PRODUCER: usize = 250;
let topic = Arc::new(Topic::new(
"test_concurrent_publishers".to_string(),
(PRODUCERS * MESSAGES_PER_PRODUCER) as u16,
9,
));
let barrier = Arc::new(Barrier::new(PRODUCERS));
let mut handles = Vec::new();
for producer_id in 0..PRODUCERS {
let topic = topic.clone();
let barrier = barrier.clone();
handles.push(thread::spawn(move || {
let publisher = topic.create_publisher();
barrier.wait();
for seq in 0..MESSAGES_PER_PRODUCER {
publisher.publish((producer_id * MESSAGES_PER_PRODUCER + seq) as u32);
}
}));
}
for handle in handles {
handle.join().unwrap();
}
let mut subscriber = topic.create_subscriber();
let mut received = Vec::new();
while subscriber.check_update() {
if let Some(data) = subscriber.check_update_and_copy() {
received.push(data);
}
}
let unique: HashSet<_> = received.iter().copied().collect();
assert_eq!(
received.len(),
PRODUCERS * MESSAGES_PER_PRODUCER,
"every published message should be readable exactly once"
);
assert_eq!(
unique.len(),
PRODUCERS * MESSAGES_PER_PRODUCER,
"concurrent publishers produced duplicate or lost messages"
);
}
#[test]
fn subscriber_read_after_new_generation_observes_committed_data() {
let topic = Arc::new(Topic::new("test_generation_visibility".to_string(), 4, 10));
let writer_has_published_generation = Arc::new(AtomicBool::new(false));
let writer_may_unlock_fifo = Arc::new(Barrier::new(2));
let topic_for_writer = topic.clone();
let published_flag = writer_has_published_generation.clone();
let unlock_barrier = writer_may_unlock_fifo.clone();
let handle = thread::spawn(move || {
let mut fifo = topic_for_writer.fifo.lock().unwrap();
fifo[0] = Some(123_u32);
topic_for_writer.generation.store(1, Ordering::Release);
published_flag.store(true, Ordering::Release);
unlock_barrier.wait();
});
while !writer_has_published_generation.load(Ordering::Acquire) {
thread::yield_now();
}
let mut subscriber = topic.create_subscriber();
assert!(subscriber.check_update());
writer_may_unlock_fifo.wait();
let data = subscriber.check_update_and_copy();
assert_eq!(data, Some(123));
handle.join().unwrap();
}
#[test]
fn create_topic_with_existing_name_returns_error() {
let topic_name = "test_duplicate_topic".to_string();
let result1 = create_topic::<u32>(topic_name.clone(), 16);
assert!(result1.is_ok());
let result2 = create_topic::<u32>(topic_name, 16);
assert!(result2.is_err());
}
#[test]
fn duplicate_topic_creation_does_not_replace_existing_topic() {
let topic_name = "test_no_replace_topic".to_string();
let topic1 = create_topic::<u32>(topic_name.clone(), 16).unwrap();
let publisher1 = topic1.create_publisher();
publisher1.publish(123);
let result2 = create_topic::<u32>(topic_name.clone(), 16);
assert!(result2.is_err());
let topic_from_get = get_topic::<u32>(&topic_name).unwrap();
let mut subscriber = topic1.create_subscriber();
assert_eq!(subscriber.check_update_and_copy(), Some(123));
let publisher_from_get = topic_from_get.create_publisher();
publisher_from_get.publish(456);
let mut subscriber2 = topic_from_get.create_subscriber();
assert_eq!(subscriber2.check_update_and_copy(), Some(123));
assert_eq!(subscriber2.check_update_and_copy(), Some(456));
}
#[test]
fn topic_num_not_incremented_on_duplicate() {
let topic_name1 = "test_topic_num_1".to_string();
let topic_name2 = "test_topic_num_2".to_string();
let topic_name3 = "test_topic_num_3".to_string();
let topic1 = create_topic::<u32>(topic_name1.clone(), 16).unwrap();
let token1 = topic1.token();
let _ = create_topic::<u32>(topic_name1, 16);
let topic2 = create_topic::<u32>(topic_name2, 16).unwrap();
let token2 = topic2.token();
assert_eq!(usize::from(token2), usize::from(token1) + 1);
let _ = create_topic::<u32>(topic_name3.clone(), 16).unwrap();
let _ = create_topic::<u32>(topic_name3, 16);
let topic3 = create_topic::<u32>("test_topic_num_final".to_string(), 16).unwrap();
assert_eq!(usize::from(topic3.token()), usize::from(token2) + 2);
}