use chrono::Utc;
use qrusty::api::StorageApi;
use qrusty::memory_storage::MemoryStorage;
use qrusty::message::{Message, Priority, PriorityOrdering, QueueConfig};
use qrusty::storage::RenameQueueError;
fn create_test_message(queue: &str, priority: u64, payload: &str) -> Message {
Message {
id: uuid::Uuid::new_v4().to_string(),
queue: queue.to_string(),
priority: Priority::Numeric(priority),
payload: payload.to_string(),
created_at: Utc::now(),
locked_until: None,
locked_by: None,
retry_count: 0,
max_retries: 3,
payload_ref: None,
payload_hash: None,
}
}
fn init_tracing() {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_test_writer()
.try_init();
}
#[tokio::test]
async fn test_memory_push_and_pop() {
init_tracing();
let storage = MemoryStorage::new();
let msg = create_test_message("q1", 100, r#"{"hello":"world"}"#);
let msg_id = msg.id.clone();
let returned_id = storage.push(msg).await.unwrap();
assert_eq!(returned_id, msg_id);
let popped = storage.pop("q1", "consumer1", 30).await.unwrap();
assert!(popped.is_some());
let popped_msg = popped.unwrap();
assert_eq!(popped_msg.id, msg_id);
assert_eq!(popped_msg.retry_count, 1);
assert!(popped_msg.locked_until.is_some());
assert_eq!(popped_msg.locked_by, Some("consumer1".to_string()));
}
#[tokio::test]
async fn test_memory_priority_ordering_max_first() {
init_tracing();
let storage = MemoryStorage::new();
storage
.create_queue(
"pq",
QueueConfig {
ordering: PriorityOrdering::MaxFirst,
..Default::default()
},
)
.await;
for (prio, payload) in [(50, "mid"), (100, "high"), (10, "low")] {
storage
.push(create_test_message("pq", prio, payload))
.await
.unwrap();
}
let m1 = storage.pop("pq", "c", 30).await.unwrap().unwrap();
let m2 = storage.pop("pq", "c", 30).await.unwrap().unwrap();
let m3 = storage.pop("pq", "c", 30).await.unwrap().unwrap();
assert_eq!(m1.payload, "high");
assert_eq!(m2.payload, "mid");
assert_eq!(m3.payload, "low");
}
#[tokio::test]
async fn test_memory_priority_ordering_min_first() {
init_tracing();
let storage = MemoryStorage::new();
storage
.create_queue(
"mq",
QueueConfig {
ordering: PriorityOrdering::MinFirst,
..Default::default()
},
)
.await;
for (prio, payload) in [(50, "mid"), (100, "high"), (10, "low")] {
storage
.push(create_test_message("mq", prio, payload))
.await
.unwrap();
}
let m1 = storage.pop("mq", "c", 30).await.unwrap().unwrap();
let m2 = storage.pop("mq", "c", 30).await.unwrap().unwrap();
let m3 = storage.pop("mq", "c", 30).await.unwrap().unwrap();
assert_eq!(m1.payload, "low");
assert_eq!(m2.payload, "mid");
assert_eq!(m3.payload, "high");
}
#[tokio::test]
async fn test_memory_fifo_ordering() {
init_tracing();
let storage = MemoryStorage::new();
storage
.create_queue(
"fq",
QueueConfig {
ordering: PriorityOrdering::Fifo,
..Default::default()
},
)
.await;
for i in 0..3 {
let mut msg = create_test_message("fq", (2 - i) * 100, &format!("msg{}", i));
msg.created_at = Utc::now() + chrono::Duration::milliseconds(i as i64 * 10);
storage.push(msg).await.unwrap();
}
let m1 = storage.pop("fq", "c", 30).await.unwrap().unwrap();
let m2 = storage.pop("fq", "c", 30).await.unwrap().unwrap();
let m3 = storage.pop("fq", "c", 30).await.unwrap().unwrap();
assert_eq!(m1.payload, "msg0");
assert_eq!(m2.payload, "msg1");
assert_eq!(m3.payload, "msg2");
}
#[tokio::test]
async fn test_memory_ack_removes() {
init_tracing();
let storage = MemoryStorage::new();
let msg = create_test_message("q", 1, "data");
let msg_id = msg.id.clone();
storage.push(msg).await.unwrap();
storage.pop("q", "c1", 30).await.unwrap().unwrap();
let acked = storage.ack("q", &msg_id, "c1").await.unwrap();
assert!(acked);
let next = storage.pop("q", "c1", 30).await.unwrap();
assert!(next.is_none());
}
#[tokio::test]
async fn test_memory_nack_unlocks() {
init_tracing();
let storage = MemoryStorage::new();
let msg = create_test_message("q", 1, "data");
let msg_id = msg.id.clone();
storage.push(msg).await.unwrap();
storage.pop("q", "c1", 30).await.unwrap().unwrap();
let nacked = storage.nack("q", &msg_id, "c1").await.unwrap();
assert!(nacked);
let recovered = storage.pop("q", "c2", 30).await.unwrap().unwrap();
assert_eq!(recovered.id, msg_id);
assert_eq!(recovered.retry_count, 2);
}
#[tokio::test]
async fn test_memory_nack_dead_letter() {
init_tracing();
let storage = MemoryStorage::new();
let mut msg = create_test_message("q", 1, "data");
msg.max_retries = 1;
let msg_id = msg.id.clone();
storage.push(msg).await.unwrap();
storage.pop("q", "c1", 30).await.unwrap().unwrap();
let nacked = storage.nack("q", &msg_id, "c1").await.unwrap();
assert!(nacked);
let next = storage.pop("q", "c1", 30).await.unwrap();
assert!(next.is_none());
}
#[tokio::test]
async fn test_memory_timeout_unlock() {
init_tracing();
let storage = MemoryStorage::new();
let msg = create_test_message("q", 1, "data");
let msg_id = msg.id.clone();
storage.push(msg).await.unwrap();
storage.pop("q", "c1", 1).await.unwrap().unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
let unlocked = storage.unlock_expired_messages().await.unwrap();
assert_eq!(unlocked, 1);
let recovered = storage.pop("q", "c2", 30).await.unwrap().unwrap();
assert_eq!(recovered.id, msg_id);
}
#[tokio::test]
async fn test_memory_duplicate_rejection() {
init_tracing();
let storage = MemoryStorage::new();
storage
.create_queue(
"nodup",
QueueConfig {
ordering: PriorityOrdering::MaxFirst,
allow_duplicates: false,
..Default::default()
},
)
.await;
let msg1 = create_test_message("nodup", 1, "same_payload");
storage.push(msg1).await.unwrap();
let msg2 = create_test_message("nodup", 2, "same_payload");
let result = storage.push(msg2).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Duplicate"));
}
#[tokio::test]
async fn test_memory_queue_stats() {
init_tracing();
let storage = MemoryStorage::new();
for i in 0..3 {
storage
.push(create_test_message("qa", i, &format!("a{}", i)))
.await
.unwrap();
}
for i in 0..2 {
storage
.push(create_test_message("qb", i, &format!("b{}", i)))
.await
.unwrap();
}
storage.pop("qa", "c1", 30).await.unwrap();
let stats = storage.get_all_queue_stats().await.unwrap();
assert_eq!(stats.len(), 2);
let qa_stats = stats.iter().find(|s| s.name == "qa").unwrap();
assert_eq!(qa_stats.available, 2);
assert_eq!(qa_stats.locked, 1);
assert_eq!(qa_stats.total, 3);
let qb_stats = stats.iter().find(|s| s.name == "qb").unwrap();
assert_eq!(qb_stats.total, 2);
}
#[tokio::test]
async fn test_memory_queue_counters_track_full_lifecycle() {
init_tracing();
let storage = MemoryStorage::new();
storage
.create_queue(
"lifecycle",
QueueConfig {
ordering: PriorityOrdering::MaxFirst,
..Default::default()
},
)
.await;
let s = storage.get_queue_stats("lifecycle").await.unwrap();
assert_eq!((s.available, s.locked, s.total), (0, 0, 0));
assert_eq!(
storage.list_queues().await.unwrap(),
vec!["lifecycle"],
"configured-but-empty queue must appear in list_queues"
);
for i in 0..4 {
storage
.push(create_test_message("lifecycle", i, &format!("p-{i}")))
.await
.unwrap();
}
let s = storage.get_queue_stats("lifecycle").await.unwrap();
assert_eq!((s.available, s.locked), (4, 0));
let mut popped = Vec::new();
for _ in 0..3 {
popped.push(
storage
.pop("lifecycle", "c1", 3600)
.await
.unwrap()
.unwrap()
.id,
);
}
let s = storage.get_queue_stats("lifecycle").await.unwrap();
assert_eq!((s.available, s.locked), (1, 3));
storage.ack("lifecycle", &popped[0], "c1").await.unwrap();
let s = storage.get_queue_stats("lifecycle").await.unwrap();
assert_eq!((s.available, s.locked), (1, 2));
storage.nack("lifecycle", &popped[1], "c1").await.unwrap();
let s = storage.get_queue_stats("lifecycle").await.unwrap();
assert_eq!((s.available, s.locked), (2, 1));
storage
.batch_ack("lifecycle", "c1", std::slice::from_ref(&popped[2]))
.await
.unwrap();
let s = storage.get_queue_stats("lifecycle").await.unwrap();
assert_eq!((s.available, s.locked), (2, 0));
storage.purge_queue("lifecycle").await.unwrap();
let s = storage.get_queue_stats("lifecycle").await.unwrap();
assert_eq!((s.available, s.locked, s.total), (0, 0, 0));
assert!(storage
.list_queues()
.await
.unwrap()
.contains(&"lifecycle".to_string()));
storage
.push(create_test_message("lifecycle", 5, "survivor"))
.await
.unwrap();
storage.rename_queue("lifecycle", "renamed").await.unwrap();
let after = storage.list_queues().await.unwrap();
assert!(after.contains(&"renamed".to_string()));
assert!(!after.contains(&"lifecycle".to_string()));
let s = storage.get_queue_stats("renamed").await.unwrap();
assert_eq!((s.available, s.locked), (1, 0));
storage.delete_queue("renamed").await.unwrap();
assert!(
!storage
.list_queues()
.await
.unwrap()
.contains(&"renamed".to_string()),
"delete_queue must remove counter entry"
);
}
#[tokio::test]
async fn test_memory_delete_queue() {
init_tracing();
let storage = MemoryStorage::new();
storage
.create_queue(
"del_q",
QueueConfig {
ordering: PriorityOrdering::MaxFirst,
..Default::default()
},
)
.await;
storage
.push(create_test_message("del_q", 1, "msg1"))
.await
.unwrap();
storage
.push(create_test_message("del_q", 2, "msg2"))
.await
.unwrap();
assert!(storage.queue_exists("del_q").await.unwrap());
let deleted = storage.delete_queue("del_q").await.unwrap();
assert_eq!(deleted, 2);
assert!(!storage.queue_exists("del_q").await.unwrap());
}
#[tokio::test]
async fn test_memory_purge_queue() {
init_tracing();
let storage = MemoryStorage::new();
storage
.create_queue(
"purge_q",
QueueConfig {
ordering: PriorityOrdering::MaxFirst,
..Default::default()
},
)
.await;
storage
.push(create_test_message("purge_q", 1, "msg1"))
.await
.unwrap();
storage
.push(create_test_message("purge_q", 2, "msg2"))
.await
.unwrap();
let purged = storage.purge_queue("purge_q").await.unwrap();
assert_eq!(purged, 2);
assert!(storage.queue_exists("purge_q").await.unwrap());
let stats = storage.get_queue_stats("purge_q").await.unwrap();
assert_eq!(stats.total, 0);
}
#[tokio::test]
async fn test_memory_rename_queue() {
init_tracing();
let storage = MemoryStorage::new();
storage
.create_queue(
"old_name",
QueueConfig {
ordering: PriorityOrdering::MinFirst,
..Default::default()
},
)
.await;
storage
.push(create_test_message("old_name", 1, "msg1"))
.await
.unwrap();
storage
.push(create_test_message("old_name", 2, "msg2"))
.await
.unwrap();
storage.rename_queue("old_name", "new_name").await.unwrap();
assert!(!storage.queue_exists("old_name").await.unwrap());
assert!(storage.queue_exists("new_name").await.unwrap());
let stats = storage.get_queue_stats("new_name").await.unwrap();
assert_eq!(stats.total, 2);
assert_eq!(stats.config.ordering, PriorityOrdering::MinFirst);
}
#[tokio::test]
async fn test_memory_batch_ack() {
init_tracing();
let storage = MemoryStorage::new();
let mut ids = Vec::new();
for i in 0..3 {
let msg = create_test_message("bq", i, &format!("msg{}", i));
ids.push(msg.id.clone());
storage.push(msg).await.unwrap();
}
for _ in 0..3 {
storage.pop("bq", "c1", 30).await.unwrap().unwrap();
}
let result = storage.batch_ack("bq", "c1", &ids).await.unwrap();
assert_eq!(result.acked.len(), 3);
assert!(result.not_found.is_empty());
let stats = storage.get_queue_stats("bq").await.unwrap();
assert_eq!(stats.total, 0);
}
#[tokio::test]
async fn test_memory_batch_nack() {
init_tracing();
let storage = MemoryStorage::new();
let mut msg1 = create_test_message("bnq", 1, "retry_me");
msg1.max_retries = 5;
let id1 = msg1.id.clone();
let mut msg2 = create_test_message("bnq", 2, "dlq_me");
msg2.max_retries = 1;
let id2 = msg2.id.clone();
storage.push(msg1).await.unwrap();
storage.push(msg2).await.unwrap();
storage.pop("bnq", "c1", 30).await.unwrap().unwrap();
storage.pop("bnq", "c1", 30).await.unwrap().unwrap();
let result = storage
.batch_nack("bnq", "c1", &[id1.clone(), id2.clone()])
.await
.unwrap();
assert_eq!(result.unlocked.len(), 1);
assert!(result.unlocked.contains(&id1));
assert_eq!(result.dead_lettered.len(), 1);
assert!(result.dead_lettered.contains(&id2));
}
#[tokio::test]
async fn test_memory_ack_fast_path_cleans_stale_secondary_index_entry() {
init_tracing();
let storage = MemoryStorage::new();
let ghost_id = "ghost-id";
let ghost_key = "q/ghost-key";
storage
.__test_insert_locked_id_index_entry("q", ghost_id, ghost_key)
.await;
assert!(!storage.ack("q", ghost_id, "c1").await.unwrap());
assert!(
!storage.__test_locked_id_index_has("q", ghost_id).await,
"ack fast path must clean stale secondary-index entries"
);
}
#[tokio::test]
async fn test_memory_nack_fast_path_cleans_stale_secondary_index_entry() {
init_tracing();
let storage = MemoryStorage::new();
let ghost_id = "ghost-id";
let ghost_key = "q/ghost-key";
storage
.__test_insert_locked_id_index_entry("q", ghost_id, ghost_key)
.await;
assert!(!storage.nack("q", ghost_id, "c1").await.unwrap());
assert!(
!storage.__test_locked_id_index_has("q", ghost_id).await,
"nack fast path must clean stale secondary-index entries"
);
}
#[tokio::test]
async fn test_memory_renew_fast_path_cleans_stale_secondary_index_entry() {
init_tracing();
let storage = MemoryStorage::new();
let ghost_id = "ghost-id";
let ghost_key = "q/ghost-key";
storage
.__test_insert_locked_id_index_entry("q", ghost_id, ghost_key)
.await;
assert!(!storage.renew("q", ghost_id, "c1", 30).await.unwrap());
assert!(
!storage.__test_locked_id_index_has("q", ghost_id).await,
"renew fast path must clean stale secondary-index entries"
);
}
#[tokio::test]
async fn test_memory_batch_ack_fast_path_cleans_stale_index_entries() {
init_tracing();
let storage = MemoryStorage::new();
let ghost_id = "ghost-id";
let ghost_key = "some_queue/ghost-key";
storage
.__test_insert_locked_id_index_entry("some_queue", ghost_id, ghost_key)
.await;
assert!(
storage
.__test_locked_id_index_has("some_queue", ghost_id)
.await,
"precondition: stale entry is present"
);
let result = storage
.batch_ack("some_queue", "c1", &[ghost_id.to_string()])
.await
.unwrap();
assert!(result.acked.is_empty());
assert_eq!(result.not_found, vec![ghost_id.to_string()]);
assert!(
!storage
.__test_locked_id_index_has("some_queue", ghost_id)
.await,
"batch_ack fast path must clean stale secondary-index entries"
);
}
#[tokio::test]
async fn test_memory_batch_nack_fast_path_cleans_stale_index_entries() {
init_tracing();
let storage = MemoryStorage::new();
let ghost_id = "ghost-id";
let ghost_key = "some_queue/ghost-key";
storage
.__test_insert_locked_id_index_entry("some_queue", ghost_id, ghost_key)
.await;
let result = storage
.batch_nack("some_queue", "c1", &[ghost_id.to_string()])
.await
.unwrap();
assert!(result.unlocked.is_empty());
assert!(result.dead_lettered.is_empty());
assert_eq!(result.not_found, vec![ghost_id.to_string()]);
assert!(
!storage
.__test_locked_id_index_has("some_queue", ghost_id)
.await,
"batch_nack fast path must clean stale secondary-index entries"
);
}
#[tokio::test]
async fn test_memory_ack_uses_secondary_index() {
init_tracing();
let storage = MemoryStorage::new();
storage
.push(create_test_message("q", 1, "p"))
.await
.unwrap();
let popped = storage.pop("q", "c1", 3600).await.unwrap().unwrap();
assert!(
storage.__test_locked_id_index_has("q", &popped.id).await,
"pop must populate secondary index"
);
assert!(storage.ack("q", &popped.id, "c1").await.unwrap());
assert!(
!storage.__test_locked_id_index_has("q", &popped.id).await,
"ack must remove secondary-index entry"
);
assert!(!storage.ack("q", &popped.id, "c1").await.unwrap());
}
#[tokio::test]
async fn test_memory_list_queues() {
init_tracing();
let storage = MemoryStorage::new();
storage
.push(create_test_message("alpha", 1, "a"))
.await
.unwrap();
storage
.push(create_test_message("beta", 1, "b"))
.await
.unwrap();
storage
.push(create_test_message("gamma", 1, "c"))
.await
.unwrap();
let queues = storage.list_queues().await.unwrap();
assert_eq!(queues, vec!["alpha", "beta", "gamma"]);
}
#[tokio::test]
async fn test_memory_queue_exists() {
init_tracing();
let storage = MemoryStorage::new();
assert!(!storage.queue_exists("missing").await.unwrap());
storage
.create_queue(
"exists",
QueueConfig {
ordering: PriorityOrdering::MaxFirst,
..Default::default()
},
)
.await;
assert!(storage.queue_exists("exists").await.unwrap());
storage.delete_queue("exists").await.unwrap();
assert!(!storage.queue_exists("exists").await.unwrap());
}
#[tokio::test]
async fn test_memory_renew_lock() {
init_tracing();
let storage = MemoryStorage::new();
let msg = create_test_message("rq", 1, "data");
let msg_id = msg.id.clone();
storage.push(msg).await.unwrap();
let popped = storage.pop("rq", "c1", 5).await.unwrap().unwrap();
let original_lock = popped.locked_until.unwrap();
let renewed = storage.renew("rq", &msg_id, "c1", 60).await.unwrap();
assert!(renewed);
let stats_msg = storage.pop("rq", "c1", 30).await.unwrap();
assert!(stats_msg.is_none());
let acked = storage.ack("rq", &msg_id, "c1").await.unwrap();
assert!(acked);
let msg2 = create_test_message("rq", 1, "data2");
let msg2_id = msg2.id.clone();
storage.push(msg2).await.unwrap();
storage.pop("rq", "c1", 30).await.unwrap();
let not_renewed = storage
.renew("rq", &msg2_id, "wrong_consumer", 60)
.await
.unwrap();
assert!(!not_renewed);
let _ = original_lock; }
#[tokio::test]
async fn test_configure_queue_disable_duplicates_dedupes_messages() {
init_tracing();
let storage = MemoryStorage::new();
storage
.create_queue(
"dup_q",
QueueConfig {
allow_duplicates: true,
ordering: PriorityOrdering::MaxFirst,
..Default::default()
},
)
.await;
storage
.push(create_test_message("dup_q", 1, "payload_a"))
.await
.unwrap();
storage
.push(create_test_message("dup_q", 2, "payload_a"))
.await
.unwrap();
storage
.push(create_test_message("dup_q", 3, "payload_b"))
.await
.unwrap();
storage
.create_queue(
"dup_q",
QueueConfig {
allow_duplicates: false,
ordering: PriorityOrdering::MaxFirst,
..Default::default()
},
)
.await;
let stats = storage.get_queue_stats("dup_q").await.unwrap();
assert_eq!(stats.total, 2, "dedup should leave exactly 2 messages");
let dup = create_test_message("dup_q", 5, "payload_a");
assert!(
storage.push(dup).await.is_err(),
"duplicate payload must be rejected after disabling allow_duplicates"
);
}
#[tokio::test]
async fn test_configure_queue_enable_duplicates_drops_payload_set() {
init_tracing();
let storage = MemoryStorage::new();
storage
.create_queue(
"nd_q",
QueueConfig {
allow_duplicates: false,
ordering: PriorityOrdering::MaxFirst,
..Default::default()
},
)
.await;
storage
.push(create_test_message("nd_q", 1, "unique"))
.await
.unwrap();
storage
.create_queue(
"nd_q",
QueueConfig {
allow_duplicates: true,
ordering: PriorityOrdering::MaxFirst,
..Default::default()
},
)
.await;
let dup = create_test_message("nd_q", 2, "unique");
assert!(
storage.push(dup).await.is_ok(),
"duplicate push must succeed after enabling allow_duplicates"
);
}
#[tokio::test]
async fn test_memory_rename_queue_same_name_returns_ok() {
init_tracing();
let storage = MemoryStorage::new();
storage
.push(create_test_message("mem_same", 1, "x"))
.await
.unwrap();
let result = storage.rename_queue("mem_same", "mem_same").await;
assert!(
result.is_ok(),
"rename to same name must succeed: {result:?}"
);
}
#[tokio::test]
async fn test_memory_rename_queue_not_found_returns_error() {
init_tracing();
let storage = MemoryStorage::new();
let result = storage.rename_queue("ghost", "target").await;
assert!(
matches!(result, Err(RenameQueueError::NotFound)),
"expected NotFound, got {result:?}"
);
}
#[tokio::test]
async fn test_memory_rename_queue_already_exists_returns_error() {
init_tracing();
let storage = MemoryStorage::new();
storage
.push(create_test_message("mem_from", 1, "a"))
.await
.unwrap();
storage
.push(create_test_message("mem_to", 1, "b"))
.await
.unwrap();
let result = storage.rename_queue("mem_from", "mem_to").await;
assert!(
matches!(result, Err(RenameQueueError::AlreadyExists)),
"expected AlreadyExists, got {result:?}"
);
}
#[tokio::test]
async fn test_memory_rename_queue_empty_name_returns_storage_error() {
init_tracing();
let storage = MemoryStorage::new();
let r1 = storage.rename_queue("", "name").await;
assert!(matches!(r1, Err(RenameQueueError::Storage(_))));
let r2 = storage.rename_queue("name", "").await;
assert!(matches!(r2, Err(RenameQueueError::Storage(_))));
}
use qrusty::message::PriorityKind;
fn create_text_msg(queue: &str, priority: &str, payload: &str) -> Message {
Message {
id: uuid::Uuid::new_v4().to_string(),
queue: queue.to_string(),
priority: Priority::Text(priority.to_string()),
payload: payload.to_string(),
created_at: Utc::now(),
locked_until: None,
locked_by: None,
retry_count: 0,
max_retries: 3,
payload_ref: None,
payload_hash: None,
}
}
#[tokio::test]
async fn test_memory_text_priority_min_first() {
let storage = MemoryStorage::new();
let config = QueueConfig {
ordering: PriorityOrdering::MinFirst,
priority_kind: PriorityKind::Text,
..Default::default()
};
storage.create_queue("tmin", config).await;
storage
.push(create_text_msg("tmin", "charlie", "c"))
.await
.unwrap();
storage
.push(create_text_msg("tmin", "alpha", "a"))
.await
.unwrap();
storage
.push(create_text_msg("tmin", "bravo", "b"))
.await
.unwrap();
let first = storage.pop("tmin", "c1", 30).await.unwrap().unwrap();
assert_eq!(first.payload, "a");
let second = storage.pop("tmin", "c2", 30).await.unwrap().unwrap();
assert_eq!(second.payload, "b");
let third = storage.pop("tmin", "c3", 30).await.unwrap().unwrap();
assert_eq!(third.payload, "c");
}
#[tokio::test]
async fn test_memory_text_priority_max_first() {
let storage = MemoryStorage::new();
let config = QueueConfig {
ordering: PriorityOrdering::MaxFirst,
priority_kind: PriorityKind::Text,
..Default::default()
};
storage.create_queue("tmax", config).await;
storage
.push(create_text_msg("tmax", "alpha", "a"))
.await
.unwrap();
storage
.push(create_text_msg("tmax", "charlie", "c"))
.await
.unwrap();
storage
.push(create_text_msg("tmax", "bravo", "b"))
.await
.unwrap();
let first = storage.pop("tmax", "c1", 30).await.unwrap().unwrap();
assert_eq!(first.payload, "c");
let second = storage.pop("tmax", "c2", 30).await.unwrap().unwrap();
assert_eq!(second.payload, "b");
let third = storage.pop("tmax", "c3", 30).await.unwrap().unwrap();
assert_eq!(third.payload, "a");
}
#[tokio::test]
async fn test_memory_text_priority_kind_mismatch() {
let storage = MemoryStorage::new();
let config = QueueConfig {
ordering: PriorityOrdering::MinFirst,
priority_kind: PriorityKind::Text,
..Default::default()
};
storage.create_queue("tonly", config).await;
let msg = create_test_message("tonly", 42, "nope");
assert!(storage.push(msg).await.is_err());
storage.create_queue("nonly", QueueConfig::default()).await;
let msg = create_text_msg("nonly", "hi", "nope");
assert!(storage.push(msg).await.is_err());
}
#[tokio::test]
async fn test_memory_delete_queue_removes_dlq_entries() {
let storage = MemoryStorage::new();
storage
.create_queue("dlq_test", QueueConfig::default())
.await;
let mut msg = create_test_message("dlq_test", 100, "will_dlq");
msg.max_retries = 0;
let msg_id = msg.id.clone();
storage.push(msg).await.unwrap();
let consumed = storage.pop("dlq_test", "c1", 30).await.unwrap().unwrap();
assert_eq!(consumed.id, msg_id);
storage.nack("dlq_test", &msg_id, "c1").await.unwrap();
let stats = storage.get_queue_stats("dlq_test").await.unwrap();
assert_eq!(stats.total, 0);
storage.delete_queue("dlq_test").await.unwrap();
assert!(!storage.queue_exists("dlq_test").await.unwrap());
storage
.create_queue("dlq_test", QueueConfig::default())
.await;
assert!(storage.queue_exists("dlq_test").await.unwrap());
}
#[tokio::test]
async fn test_memory_text_priority_with_slash_min_first() {
let storage = MemoryStorage::new();
let config = QueueConfig {
ordering: PriorityOrdering::MinFirst,
priority_kind: PriorityKind::Text,
..Default::default()
};
storage.create_queue("slash_q", config).await;
storage
.push(create_text_msg("slash_q", "/usr/local/bin", "bin"))
.await
.unwrap();
storage
.push(create_text_msg("slash_q", "/usr/local/share", "share"))
.await
.unwrap();
storage
.push(create_text_msg("slash_q", "/etc/config", "etc"))
.await
.unwrap();
let m = storage.pop("slash_q", "c1", 30).await.unwrap().unwrap();
assert_eq!(m.payload, "etc");
assert_eq!(m.priority, Priority::Text("/etc/config".to_string()));
storage.ack("slash_q", &m.id, "c1").await.unwrap();
let m = storage.pop("slash_q", "c1", 30).await.unwrap().unwrap();
assert_eq!(m.payload, "bin");
assert_eq!(m.priority, Priority::Text("/usr/local/bin".to_string()));
storage.ack("slash_q", &m.id, "c1").await.unwrap();
let m = storage.pop("slash_q", "c1", 30).await.unwrap().unwrap();
assert_eq!(m.payload, "share");
assert_eq!(m.priority, Priority::Text("/usr/local/share".to_string()));
}
#[tokio::test]
async fn test_memory_text_priority_with_slash_max_first() {
let storage = MemoryStorage::new();
let config = QueueConfig {
ordering: PriorityOrdering::MaxFirst,
priority_kind: PriorityKind::Text,
..Default::default()
};
storage.create_queue("slash_max_q", config).await;
storage
.push(create_text_msg("slash_max_q", "/a/first", "first"))
.await
.unwrap();
storage
.push(create_text_msg("slash_max_q", "/z/last", "last"))
.await
.unwrap();
let m = storage.pop("slash_max_q", "c1", 30).await.unwrap().unwrap();
assert_eq!(m.payload, "last");
assert_eq!(m.priority, Priority::Text("/z/last".to_string()));
storage.ack("slash_max_q", &m.id, "c1").await.unwrap();
let m = storage.pop("slash_max_q", "c1", 30).await.unwrap().unwrap();
assert_eq!(m.payload, "first");
assert_eq!(m.priority, Priority::Text("/a/first".to_string()));
}
#[tokio::test]
async fn test_memory_text_priority_slash_stats_not_corrupted() {
let storage = MemoryStorage::new();
let config = QueueConfig {
ordering: PriorityOrdering::MinFirst,
priority_kind: PriorityKind::Text,
..Default::default()
};
storage.create_queue("stats_slash_q", config).await;
storage
.push(create_text_msg("stats_slash_q", "/a/b/c/d", "p1"))
.await
.unwrap();
storage
.push(create_text_msg("stats_slash_q", "simple", "p2"))
.await
.unwrap();
let stats = storage.get_queue_stats("stats_slash_q").await.unwrap();
assert_eq!(stats.total, 2);
assert_eq!(stats.available, 2);
let queues = storage.list_queues().await.unwrap();
assert!(queues.contains(&"stats_slash_q".to_string()));
}
#[tokio::test]
async fn test_memory_force_unlock_queue() {
let storage = MemoryStorage::new();
storage
.create_queue(
"fu_mem",
QueueConfig {
ordering: PriorityOrdering::MaxFirst,
..Default::default()
},
)
.await;
for i in 0..3 {
let msg = create_test_message("fu_mem", 10 + i, &format!("msg-{}", i));
storage.push(msg).await.unwrap();
}
for _ in 0..3 {
storage.pop("fu_mem", "consumer1", 300).await.unwrap();
}
let stats = storage.get_queue_stats("fu_mem").await.unwrap();
assert_eq!(stats.locked, 3);
let unlocked = storage.force_unlock_queue("fu_mem").await.unwrap();
assert_eq!(unlocked, 3);
for _ in 0..3 {
assert!(storage
.pop("fu_mem", "consumer2", 300)
.await
.unwrap()
.is_some());
}
}
#[tokio::test]
async fn test_memory_force_unlock_queue_empty() {
let storage = MemoryStorage::new();
let unlocked = storage.force_unlock_queue("nonexistent").await.unwrap();
assert_eq!(unlocked, 0);
}