use crate::consumer::Consumer;
use crate::queue::Queue;
use crate::record::{ErrorQueue, Mode, MsgType};
use std::fs::OpenOptions;
use std::io::{Seek, SeekFrom};
use std::time::Duration;
use std::{fs, thread};
fn check_message_integrity(received_numbers: &[i32]) {
println!("{:?}", received_numbers);
for i in 1..received_numbers.len() {
assert_eq!(received_numbers[i], received_numbers[i - 1] + 1);
}
}
#[test]
fn test_read_only_mode() {
let base_path = create_unique_queue_path("./test-tmp", "queue");
let queue_name = "test_queue";
let mut queue = Queue::new(&base_path, queue_name, Mode::ReadWrite).unwrap();
let num_messages = 5;
for i in 0..num_messages {
let msg = format!("Message {}", i);
queue.push(msg.as_bytes(), MsgType::String).unwrap();
}
drop(queue);
let mut queue = Queue::new(&base_path, queue_name, Mode::Read).unwrap();
let message = "Hello, world!".as_bytes();
let msg_type = MsgType::String;
let result = queue.push(message, msg_type);
assert!(result.is_err());
assert_eq!(result.unwrap_err(), ErrorQueue::NotReady);
let consumer_name = "consumer";
let mut consumer = Consumer::new(&base_path, consumer_name, queue_name).unwrap();
let mut received_messages = Vec::new();
while consumer.pop_header() {
let msg_size = consumer.header.msg_length as usize;
let mut msg = vec![0; msg_size];
if consumer.pop_body(&mut msg).is_ok() {
let message = String::from_utf8(msg).unwrap();
received_messages.push(message);
consumer.commit();
} else {
break;
}
}
assert_eq!(received_messages.len(), num_messages as usize);
for (i, msg) in received_messages.iter().enumerate() {
assert_eq!(msg, &format!("Message {}", i));
}
}
#[test]
fn test_queue_consumer_interaction() {
let base_path = create_unique_queue_path("./test-tmp", "queue");
let queue_name = "test_queue";
let mut queue = Queue::new(&base_path, queue_name, Mode::ReadWrite).unwrap();
let num_messages = 10;
for i in 0..num_messages {
let msg = format!("{}", i);
queue.push(msg.as_bytes(), MsgType::String).unwrap();
}
let num_consumers = 3;
let mut consumers = Vec::new();
for i in 0..num_consumers {
let consumer_name = format!("consumer_{}", i);
let consumer = Consumer::new(&base_path, &consumer_name, queue_name).unwrap();
consumers.push(consumer);
}
let mut received_numbers = vec![Vec::new(); num_consumers];
for (i, consumer) in consumers.iter_mut().enumerate() {
while consumer.pop_header() {
let msg_size = consumer.header.msg_length as usize;
let mut msg = vec![0; msg_size];
if consumer.pop_body(&mut msg).is_ok() {
let number = String::from_utf8(msg).unwrap().parse::<i32>().unwrap();
received_numbers[i].push(number);
consumer.commit();
} else {
break;
}
}
}
for numbers in received_numbers.iter() {
check_message_integrity(numbers);
}
drop(queue);
let mut queue = Queue::new(&base_path, queue_name, Mode::ReadWrite).unwrap();
let num_new_messages = 5;
for i in num_messages..num_messages + num_new_messages {
let msg = format!("{}", i);
queue.push(msg.as_bytes(), MsgType::String).unwrap();
}
thread::sleep(Duration::from_millis(100));
let mut new_received_numbers = vec![Vec::new(); num_consumers];
for (i, consumer) in consumers.iter_mut().enumerate() {
while consumer.pop_header() {
let msg_size = consumer.header.msg_length as usize;
let mut msg = vec![0; msg_size];
if consumer.pop_body(&mut msg).is_ok() {
let number = String::from_utf8(msg).unwrap().parse::<i32>().unwrap();
new_received_numbers[i].push(number);
consumer.commit();
} else {
break;
}
}
}
for numbers in new_received_numbers.iter() {
check_message_integrity(numbers);
}
}
#[test]
fn test_consumer_reconnect() {
println!("test_consumer_reconnect");
let base_path = create_unique_queue_path("./test-tmp", "queue");
let queue_name = "test_queue";
let mut queue = Queue::new(&base_path, queue_name, Mode::ReadWrite).unwrap();
let num_messages = 10;
for i in 0..num_messages {
let msg = format!("{}", i);
println!("push {}", msg);
queue.push(msg.as_bytes(), MsgType::String).unwrap();
}
let consumer_name = "consumer";
let mut consumer = Consumer::new(&base_path, consumer_name, queue_name).unwrap();
let num_read_messages = 5;
for _ in 0..num_read_messages {
if consumer.pop_header() {
let msg_size = consumer.header.msg_length as usize;
let mut msg = vec![0; msg_size];
consumer.pop_body(&mut msg).unwrap();
consumer.commit();
}
}
drop(consumer);
let mut consumer = Consumer::new(&base_path, consumer_name, queue_name).unwrap();
let mut received_numbers = Vec::new();
while consumer.pop_header() {
let msg_size = consumer.header.msg_length as usize;
let mut msg = vec![0; msg_size];
if consumer.pop_body(&mut msg).is_ok() {
let number = String::from_utf8(msg).unwrap().parse::<i32>().unwrap();
received_numbers.push(number);
consumer.commit();
} else {
break;
}
}
check_message_integrity(&received_numbers);
}
use uuid::Uuid;
fn create_unique_queue_path(base_path: &str, prefix: &str) -> String {
let uuid = Uuid::new_v4().to_string();
let path = format!("{}/{}_{}", base_path, prefix, uuid);
fs::remove_dir_all(&base_path).unwrap_or_default();
path
}
#[test]
fn test_queue_empty() {
let base_path = create_unique_queue_path("./test-tmp", "queue");
let queue_name = "test_queue";
let _queue = Queue::new(&base_path, queue_name, Mode::ReadWrite).unwrap();
let consumer_name = "consumer";
let mut consumer = Consumer::new(&base_path, consumer_name, queue_name).unwrap();
assert!(!consumer.pop_header());
}
#[test]
fn test_multiple_queues() {
let base_path = create_unique_queue_path("./test-tmp", "queue");
let queue_name_1 = "test_queue_1";
let queue_name_2 = "test_queue_2";
let mut queue_1 = Queue::new(&base_path, queue_name_1, Mode::ReadWrite).unwrap();
let mut queue_2 = Queue::new(&base_path, queue_name_2, Mode::ReadWrite).unwrap();
let num_messages = 5;
for i in 0..num_messages {
let msg = format!("{}", i);
queue_1.push(msg.as_bytes(), MsgType::String).unwrap();
queue_2.push(msg.as_bytes(), MsgType::String).unwrap();
}
let consumer_name_1 = "consumer_1";
let consumer_name_2 = "consumer_2";
let mut consumer_1 = Consumer::new(&base_path, consumer_name_1, queue_name_1).unwrap();
let mut consumer_2 = Consumer::new(&base_path, consumer_name_2, queue_name_2).unwrap();
let mut received_numbers_1 = Vec::new();
let mut received_numbers_2 = Vec::new();
while consumer_1.pop_header() {
let msg_size = consumer_1.header.msg_length as usize;
let mut msg = vec![0; msg_size];
if consumer_1.pop_body(&mut msg).is_ok() {
let number = String::from_utf8(msg).unwrap().parse::<i32>().unwrap();
received_numbers_1.push(number);
consumer_1.commit();
} else {
break;
}
}
while consumer_2.pop_header() {
let msg_size = consumer_2.header.msg_length as usize;
let mut msg = vec![0; msg_size];
if consumer_2.pop_body(&mut msg).is_ok() {
let number = String::from_utf8(msg).unwrap().parse::<i32>().unwrap();
received_numbers_2.push(number);
consumer_2.commit();
} else {
break;
}
}
check_message_integrity(&received_numbers_1);
check_message_integrity(&received_numbers_2);
}
#[test]
fn test_push_rollback_on_write_error() {
let base_path = create_unique_queue_path("./test-tmp", "queue");
let queue_name = "test_rollback";
let mut queue = Queue::new(&base_path, queue_name, Mode::ReadWrite).unwrap();
queue.push(b"valid_message", MsgType::String).expect("first push must succeed");
let saved_right_edge = queue.right_edge;
let saved_count = queue.count_pushed;
let queue_file_path = format!("{}/{}-{}/{}_queue", base_path, queue_name, queue.id, queue_name);
let saved_file_size = fs::metadata(&queue_file_path).unwrap().len();
let dev_full = match OpenOptions::new().write(true).open("/dev/full") {
Ok(f) => f,
Err(_) => {
eprintln!("test_push_rollback_on_write_error: skipped, /dev/full not available");
return;
},
};
let original_handle = std::mem::replace(&mut queue.ff_queue, dev_full);
let result = queue.push(b"this push will fail", MsgType::String);
assert!(result.is_err(), "push to /dev/full must return Err, got {:?}", result);
assert_eq!(queue.right_edge, saved_right_edge, "right_edge must not advance past the last successful push");
assert_eq!(queue.count_pushed, saved_count, "count_pushed must not advance past the last successful push");
queue.ff_queue = original_handle;
queue.ff_queue.seek(SeekFrom::Start(saved_right_edge)).unwrap();
let file_size_after = fs::metadata(&queue_file_path).unwrap().len();
assert_eq!(file_size_after, saved_file_size, "queue file must not change while writes were redirected to /dev/full");
queue.push(b"recovery_message", MsgType::String).expect("post-recovery push must succeed");
drop(queue);
let mut consumer = Consumer::new(&base_path, "rollback_consumer", queue_name).unwrap();
let mut received = Vec::new();
while consumer.pop_header() {
let mut msg = vec![0; consumer.header.msg_length as usize];
consumer.pop_body(&mut msg).unwrap();
received.push(String::from_utf8(msg).unwrap());
consumer.commit();
}
assert_eq!(received, vec!["valid_message".to_string(), "recovery_message".to_string()]);
}
#[test]
fn test_queue_new_fails_on_corrupted_info_push_crc() {
let base_path = create_unique_queue_path("./test-tmp", "queue");
let queue_name = "test_info_push_corrupt";
{
let mut q = Queue::new(&base_path, queue_name, Mode::ReadWrite).unwrap();
for i in 0..3 {
q.push(format!("msg{}", i).as_bytes(), MsgType::String).unwrap();
}
}
let info_push_path = format!("{}/{}-0/{}_info_push", base_path, queue_name, queue_name);
let mut content = fs::read(&info_push_path).expect("info_push must exist");
let len = content.len();
assert!(len > 1 && content[len - 1] == b'\n', "info_push must end with newline");
let target = len - 2; content[target] = if content[target] == b'9' { b'0' } else { content[target] + 1 };
fs::write(&info_push_path, &content).unwrap();
match Queue::new(&base_path, queue_name, Mode::Read) {
Err(e) => assert_eq!(e, ErrorQueue::InvalidChecksum),
Ok(_) => panic!("Queue::new(Read) must return Err when info_push CRC is corrupted, got Ok"),
}
}
#[test]
fn test_queue_new_fails_on_corrupted_info_queue_crc() {
let base_path = create_unique_queue_path("./test-tmp", "queue");
let queue_name = "test_info_queue_corrupt";
{
let mut q = Queue::new(&base_path, queue_name, Mode::ReadWrite).unwrap();
q.push(b"msg", MsgType::String).unwrap();
}
let info_queue_path = format!("{}/{}_info_queue", base_path, queue_name);
let mut content = fs::read(&info_queue_path).expect("info_queue must exist");
let len = content.len();
assert!(len > 1 && content[len - 1] == b'\n', "info_queue must end with newline");
let target = len - 2;
content[target] = if content[target] == b'9' { b'0' } else { content[target] + 1 };
fs::write(&info_queue_path, &content).unwrap();
match Queue::new(&base_path, queue_name, Mode::Read) {
Err(e) => assert_eq!(e, ErrorQueue::InvalidChecksum),
Ok(_) => panic!("Queue::new(Read) must return Err when info_queue CRC is corrupted, got Ok"),
}
}
#[test]
fn test_double_open_returns_already_open() {
let base_path = create_unique_queue_path("./test-tmp", "queue");
let queue_name = "test_double_open";
let q1 = Queue::new(&base_path, queue_name, Mode::ReadWrite).unwrap();
match Queue::new(&base_path, queue_name, Mode::ReadWrite) {
Err(e) => assert_eq!(e, ErrorQueue::AlreadyOpen, "expected AlreadyOpen, got {:?}", e),
Ok(_) => panic!("second Queue::new(ReadWrite) must fail while first instance is alive"),
}
if Queue::new(&base_path, queue_name, Mode::Read).is_err() {
panic!("Queue::new(Read) must succeed alongside an active ReadWrite handle");
}
drop(q1);
if Queue::new(&base_path, queue_name, Mode::ReadWrite).is_err() {
panic!("re-open in ReadWrite mode must succeed after the first instance was dropped");
}
}
#[test]
fn test_consumer_reconnect_arbitrary_queue_name() {
let base_path = create_unique_queue_path("./test-tmp", "queue");
let queue_name = "arbitrary_queue_name";
{
let mut q = Queue::new(&base_path, queue_name, Mode::ReadWrite).unwrap();
for i in 0..5 {
q.push(format!("m{}", i).as_bytes(), MsgType::String).unwrap();
}
}
{
let mut c = Consumer::new(&base_path, "consumer_x", queue_name).unwrap();
for _ in 0..2 {
assert!(c.pop_header(), "pop_header must succeed on first session");
let mut msg = vec![0; c.header.msg_length as usize];
c.pop_body(&mut msg).unwrap();
assert!(c.commit(), "commit must succeed");
}
}
let mut c = Consumer::new(&base_path, "consumer_x", queue_name).unwrap();
let mut received = Vec::new();
while c.pop_header() {
let mut msg = vec![0; c.header.msg_length as usize];
c.pop_body(&mut msg).unwrap();
received.push(String::from_utf8(msg).unwrap());
c.commit();
}
assert_eq!(received, vec!["m2".to_string(), "m3".to_string(), "m4".to_string()]);
}