use crate::consumer::Consumer;
use crate::queue::Queue;
use crate::record::{ErrorQueue, Mode, MsgType};
use std::time::Duration;
use std::{fs, thread};
fn check_message_integrity(received_numbers: &[i32]) {
println!("{:?}", received_numbers);
for i in 1..received_numbers.len() {
assert_eq!(received_numbers[i], received_numbers[i - 1] + 1);
}
}
#[test]
fn test_read_only_mode() {
let base_path = create_unique_queue_path("./test-tmp", "queue");
let queue_name = "test_queue";
let mut queue = Queue::new(&base_path, queue_name, Mode::ReadWrite).unwrap();
let num_messages = 5;
for i in 0..num_messages {
let msg = format!("Message {}", i);
queue.push(msg.as_bytes(), MsgType::String).unwrap();
}
drop(queue);
let mut queue = Queue::new(&base_path, queue_name, Mode::Read).unwrap();
let message = "Hello, world!".as_bytes();
let msg_type = MsgType::String;
let result = queue.push(message, msg_type);
assert!(result.is_err());
assert_eq!(result.unwrap_err(), ErrorQueue::NotReady);
let consumer_name = "consumer";
let mut consumer = Consumer::new(&base_path, consumer_name, queue_name).unwrap();
let mut received_messages = Vec::new();
while consumer.pop_header() {
let msg_size = consumer.header.msg_length as usize;
let mut msg = vec![0; msg_size];
if consumer.pop_body(&mut msg).is_ok() {
let message = String::from_utf8(msg).unwrap();
received_messages.push(message);
consumer.commit();
} else {
break;
}
}
assert_eq!(received_messages.len(), num_messages as usize);
for (i, msg) in received_messages.iter().enumerate() {
assert_eq!(msg, &format!("Message {}", i));
}
}
#[test]
fn test_queue_consumer_interaction() {
let base_path = create_unique_queue_path("./test-tmp", "queue");
let queue_name = "test_queue";
let mut queue = Queue::new(&base_path, queue_name, Mode::ReadWrite).unwrap();
let num_messages = 10;
for i in 0..num_messages {
let msg = format!("{}", i);
queue.push(msg.as_bytes(), MsgType::String).unwrap();
}
let num_consumers = 3;
let mut consumers = Vec::new();
for i in 0..num_consumers {
let consumer_name = format!("consumer_{}", i);
let consumer = Consumer::new(&base_path, &consumer_name, queue_name).unwrap();
consumers.push(consumer);
}
let mut received_numbers = vec![Vec::new(); num_consumers];
for (i, consumer) in consumers.iter_mut().enumerate() {
while consumer.pop_header() {
let msg_size = consumer.header.msg_length as usize;
let mut msg = vec![0; msg_size];
if consumer.pop_body(&mut msg).is_ok() {
let number = String::from_utf8(msg).unwrap().parse::<i32>().unwrap();
received_numbers[i].push(number);
consumer.commit();
} else {
break;
}
}
}
for numbers in received_numbers.iter() {
check_message_integrity(numbers);
}
drop(queue);
let mut queue = Queue::new(&base_path, queue_name, Mode::ReadWrite).unwrap();
let num_new_messages = 5;
for i in num_messages..num_messages + num_new_messages {
let msg = format!("{}", i);
queue.push(msg.as_bytes(), MsgType::String).unwrap();
}
thread::sleep(Duration::from_millis(100));
let mut new_received_numbers = vec![Vec::new(); num_consumers];
for (i, consumer) in consumers.iter_mut().enumerate() {
while consumer.pop_header() {
let msg_size = consumer.header.msg_length as usize;
let mut msg = vec![0; msg_size];
if consumer.pop_body(&mut msg).is_ok() {
let number = String::from_utf8(msg).unwrap().parse::<i32>().unwrap();
new_received_numbers[i].push(number);
consumer.commit();
} else {
break;
}
}
}
for numbers in new_received_numbers.iter() {
check_message_integrity(numbers);
}
}
#[test]
fn test_consumer_reconnect() {
println!("test_consumer_reconnect");
let base_path = create_unique_queue_path("./test-tmp", "queue");
let queue_name = "test_queue";
let mut queue = Queue::new(&base_path, queue_name, Mode::ReadWrite).unwrap();
let num_messages = 10;
for i in 0..num_messages {
let msg = format!("{}", i);
println!("push {}", msg);
queue.push(msg.as_bytes(), MsgType::String).unwrap();
}
let consumer_name = "consumer";
let mut consumer = Consumer::new(&base_path, consumer_name, queue_name).unwrap();
let num_read_messages = 5;
for _ in 0..num_read_messages {
if consumer.pop_header() {
let msg_size = consumer.header.msg_length as usize;
let mut msg = vec![0; msg_size];
consumer.pop_body(&mut msg).unwrap();
consumer.commit();
}
}
drop(consumer);
let mut consumer = Consumer::new(&base_path, consumer_name, queue_name).unwrap();
let mut received_numbers = Vec::new();
while consumer.pop_header() {
let msg_size = consumer.header.msg_length as usize;
let mut msg = vec![0; msg_size];
if consumer.pop_body(&mut msg).is_ok() {
let number = String::from_utf8(msg).unwrap().parse::<i32>().unwrap();
received_numbers.push(number);
consumer.commit();
} else {
break;
}
}
check_message_integrity(&received_numbers);
}
use uuid::Uuid;
fn create_unique_queue_path(base_path: &str, prefix: &str) -> String {
let uuid = Uuid::new_v4().to_string();
let path = format!("{}/{}_{}", base_path, prefix, uuid);
fs::remove_dir_all(&base_path).unwrap_or_default();
path
}
#[test]
fn test_queue_empty() {
let base_path = create_unique_queue_path("./test-tmp", "queue");
let queue_name = "test_queue";
let _queue = Queue::new(&base_path, queue_name, Mode::ReadWrite).unwrap();
let consumer_name = "consumer";
let mut consumer = Consumer::new(&base_path, consumer_name, queue_name).unwrap();
assert!(!consumer.pop_header());
}
#[test]
fn test_multiple_queues() {
let base_path = create_unique_queue_path("./test-tmp", "queue");
let queue_name_1 = "test_queue_1";
let queue_name_2 = "test_queue_2";
let mut queue_1 = Queue::new(&base_path, queue_name_1, Mode::ReadWrite).unwrap();
let mut queue_2 = Queue::new(&base_path, queue_name_2, Mode::ReadWrite).unwrap();
let num_messages = 5;
for i in 0..num_messages {
let msg = format!("{}", i);
queue_1.push(msg.as_bytes(), MsgType::String).unwrap();
queue_2.push(msg.as_bytes(), MsgType::String).unwrap();
}
let consumer_name_1 = "consumer_1";
let consumer_name_2 = "consumer_2";
let mut consumer_1 = Consumer::new(&base_path, consumer_name_1, queue_name_1).unwrap();
let mut consumer_2 = Consumer::new(&base_path, consumer_name_2, queue_name_2).unwrap();
let mut received_numbers_1 = Vec::new();
let mut received_numbers_2 = Vec::new();
while consumer_1.pop_header() {
let msg_size = consumer_1.header.msg_length as usize;
let mut msg = vec![0; msg_size];
if consumer_1.pop_body(&mut msg).is_ok() {
let number = String::from_utf8(msg).unwrap().parse::<i32>().unwrap();
received_numbers_1.push(number);
consumer_1.commit();
} else {
break;
}
}
while consumer_2.pop_header() {
let msg_size = consumer_2.header.msg_length as usize;
let mut msg = vec![0; msg_size];
if consumer_2.pop_body(&mut msg).is_ok() {
let number = String::from_utf8(msg).unwrap().parse::<i32>().unwrap();
received_numbers_2.push(number);
consumer_2.commit();
} else {
break;
}
}
check_message_integrity(&received_numbers_1);
check_message_integrity(&received_numbers_2);
}