use crate::provider_validation::ProviderFactory;
use crate::providers::{SessionFetchConfig, TagFilter, WorkItem};
use std::time::Duration;
fn session_activity(instance: &str, id: u64, session_id: &str) -> WorkItem {
WorkItem::ActivityExecute {
instance: instance.to_string(),
execution_id: 1,
id,
name: "SessionActivity".to_string(),
input: "{}".to_string(),
session_id: Some(session_id.to_string()),
tag: None,
}
}
fn plain_activity(instance: &str, id: u64) -> WorkItem {
WorkItem::ActivityExecute {
instance: instance.to_string(),
execution_id: 1,
id,
name: "PlainActivity".to_string(),
input: "{}".to_string(),
session_id: None,
tag: None,
}
}
fn session_config(owner_id: &str) -> SessionFetchConfig {
SessionFetchConfig {
owner_id: owner_id.to_string(),
lock_timeout: Duration::from_secs(30),
}
}
fn session_config_with_lock(owner_id: &str, lock_timeout: Duration) -> SessionFetchConfig {
SessionFetchConfig {
owner_id: owner_id.to_string(),
lock_timeout,
}
}
pub async fn test_non_session_items_fetchable_by_any_worker(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider.enqueue_for_worker(plain_activity("inst-1", 1)).await.unwrap();
let result = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(result.is_some(), "Non-session item should be fetchable by any worker");
}
pub async fn test_session_item_claimable_when_no_session(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "session-1"))
.await
.unwrap();
let result = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(result.is_some(), "Unclaimed session item should be fetchable");
if let Some((WorkItem::ActivityExecute { session_id, .. }, _, _)) = &result {
assert_eq!(session_id.as_deref(), Some("session-1"));
} else {
panic!("Expected ActivityExecute work item");
}
}
pub async fn test_session_affinity_same_worker(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "session-1"))
.await
.unwrap();
let (_, token1, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token1, None).await.unwrap();
provider
.enqueue_for_worker(session_activity("inst-1", 2, "session-1"))
.await
.unwrap();
let result = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(
result.is_some(),
"Owned session item should be fetchable by owning worker"
);
}
pub async fn test_session_affinity_blocks_other_worker(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "session-1"))
.await
.unwrap();
let (_, token1, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token1, None).await.unwrap();
provider
.enqueue_for_worker(session_activity("inst-1", 2, "session-1"))
.await
.unwrap();
let result = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-B")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(
result.is_none(),
"Non-owning worker should not fetch owned session items"
);
}
pub async fn test_different_sessions_different_workers(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "session-1"))
.await
.unwrap();
let (_, token1, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token1, None).await.unwrap();
provider
.enqueue_for_worker(session_activity("inst-2", 1, "session-2"))
.await
.unwrap();
let (_, token2, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-B")),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token2, None).await.unwrap();
provider
.enqueue_for_worker(session_activity("inst-1", 2, "session-1"))
.await
.unwrap();
provider
.enqueue_for_worker(session_activity("inst-2", 2, "session-2"))
.await
.unwrap();
let result_a = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(result_a.is_some(), "Worker-A should fetch session-1 item");
if let Some((WorkItem::ActivityExecute { session_id, .. }, _, _)) = &result_a {
assert_eq!(session_id.as_deref(), Some("session-1"));
}
let result_b = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-B")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(result_b.is_some(), "Worker-B should fetch session-2 item");
if let Some((WorkItem::ActivityExecute { session_id, .. }, _, _)) = &result_b {
assert_eq!(session_id.as_deref(), Some("session-2"));
}
}
pub async fn test_mixed_session_and_non_session_items(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "session-1"))
.await
.unwrap();
let (_, token, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token, None).await.unwrap();
provider
.enqueue_for_worker(session_activity("inst-1", 2, "session-1"))
.await
.unwrap();
provider.enqueue_for_worker(plain_activity("inst-2", 1)).await.unwrap();
let result = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-B")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(result.is_some(), "Worker-B should get the plain item");
if let Some((WorkItem::ActivityExecute { session_id, .. }, _, _)) = &result {
assert!(session_id.is_none(), "Worker-B should get a non-session item");
}
}
pub async fn test_session_claimable_after_lock_expiry(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "session-1"))
.await
.unwrap();
let cfg = session_config_with_lock("worker-A", Duration::from_millis(50));
let (_, token, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&cfg),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token, None).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
provider
.enqueue_for_worker(session_activity("inst-1", 2, "session-1"))
.await
.unwrap();
let result = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-B")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(result.is_some(), "Worker-B should claim expired session");
}
pub async fn test_none_session_skips_session_items(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "session-1"))
.await
.unwrap();
provider.enqueue_for_worker(plain_activity("inst-2", 2)).await.unwrap();
let result = provider
.fetch_work_item(Duration::from_secs(5), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap();
assert!(result.is_some(), "Should fetch a non-session item with session=None");
if let Some((WorkItem::ActivityExecute { session_id, id, .. }, token, _)) = result {
assert!(session_id.is_none(), "Should be a non-session item");
assert_eq!(id, 2, "Should get the plain activity");
provider.ack_work_item(&token, None).await.unwrap();
}
let result2 = provider
.fetch_work_item(Duration::from_secs(5), Duration::ZERO, None, &TagFilter::default())
.await
.unwrap();
assert!(result2.is_none(), "Session item should be invisible to None fetch");
let result3 = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(result3.is_some(), "Session item should be visible with Some config");
if let Some((WorkItem::ActivityExecute { session_id, .. }, _, _)) = &result3 {
assert_eq!(session_id.as_deref(), Some("session-1"));
}
}
pub async fn test_some_session_returns_all_items(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "sess-A"))
.await
.unwrap();
provider
.enqueue_for_worker(session_activity("inst-2", 2, "sess-B"))
.await
.unwrap();
provider
.enqueue_for_worker(session_activity("inst-3", 3, "sess-C"))
.await
.unwrap();
provider.enqueue_for_worker(plain_activity("inst-4", 4)).await.unwrap();
let mut fetched_ids = Vec::new();
for _ in 0..4 {
let (item, token, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("shared-proc")),
&TagFilter::default(),
)
.await
.unwrap()
.expect("Should fetch item");
if let WorkItem::ActivityExecute { id, .. } = &item {
fetched_ids.push(*id);
}
provider.ack_work_item(&token, None).await.unwrap();
}
assert_eq!(
fetched_ids,
vec![1, 2, 3, 4],
"All items should be fetched in FIFO order"
);
let result = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("shared-proc")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(result.is_none(), "Queue should be empty");
}
pub async fn test_renew_session_lock_active(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "session-1"))
.await
.unwrap();
let cfg = session_config_with_lock("worker-A", Duration::from_secs(5));
let (_, token, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&cfg),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token, None).await.unwrap();
let count = provider
.renew_session_lock(&["worker-A"], Duration::from_secs(30), Duration::from_secs(300))
.await
.unwrap();
assert_eq!(count, 1, "Should renew 1 active session");
provider
.enqueue_for_worker(session_activity("inst-2", 1, "session-2"))
.await
.unwrap();
let (_, token2, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token2, None).await.unwrap();
let count2 = provider
.renew_session_lock(&["worker-A"], Duration::from_secs(30), Duration::from_secs(300))
.await
.unwrap();
assert_eq!(count2, 2, "Should renew 2 active sessions");
}
pub async fn test_renew_session_lock_skips_idle(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "session-1"))
.await
.unwrap();
let (_, token, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token, None).await.unwrap();
let count = provider
.renew_session_lock(&["worker-A"], Duration::from_secs(30), Duration::ZERO)
.await
.unwrap();
assert_eq!(count, 0, "Should skip idle session");
}
pub async fn test_renew_session_lock_no_sessions(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
let count = provider
.renew_session_lock(&["worker-A"], Duration::from_secs(30), Duration::from_secs(300))
.await
.unwrap();
assert_eq!(count, 0, "Worker with no sessions should renew 0");
}
pub async fn test_cleanup_removes_expired_no_items(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "session-1"))
.await
.unwrap();
let cfg = session_config_with_lock("worker-A", Duration::from_millis(50));
let (_, token, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&cfg),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token, None).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let count = provider
.cleanup_orphaned_sessions(Duration::from_secs(300))
.await
.unwrap();
assert_eq!(count, 1, "Should clean up 1 expired session");
}
pub async fn test_cleanup_keeps_sessions_with_pending_items(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "session-1"))
.await
.unwrap();
let cfg = session_config_with_lock("worker-A", Duration::from_millis(50));
let (_, token, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&cfg),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token, None).await.unwrap();
provider
.enqueue_for_worker(session_activity("inst-1", 2, "session-1"))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let count = provider
.cleanup_orphaned_sessions(Duration::from_secs(300))
.await
.unwrap();
assert_eq!(count, 0, "Should not clean up session with pending items");
}
pub async fn test_cleanup_keeps_active_sessions(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "session-1"))
.await
.unwrap();
let cfg = session_config_with_lock("worker-A", Duration::from_secs(300));
let (_, token, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&cfg),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token, None).await.unwrap();
let count = provider
.cleanup_orphaned_sessions(Duration::from_secs(300))
.await
.unwrap();
assert_eq!(count, 0, "Should not clean up active session");
}
pub async fn test_ack_updates_session_last_activity(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
let idle_window = Duration::from_millis(200);
provider
.enqueue_for_worker(session_activity("inst-1", 1, "session-1"))
.await
.unwrap();
let (_, token, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
tokio::time::sleep(idle_window + Duration::from_millis(50)).await;
provider
.ack_work_item(
&token,
Some(WorkItem::ActivityCompleted {
instance: "inst-1".to_string(),
execution_id: 1,
id: 1,
result: "ok".to_string(),
}),
)
.await
.unwrap();
let count = provider
.renew_session_lock(&["worker-A"], Duration::from_secs(30), idle_window)
.await
.unwrap();
assert_eq!(
count, 1,
"Session should be active — ack must have updated last_activity_at"
);
}
pub async fn test_renew_work_item_updates_session_last_activity(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
let idle_window = Duration::from_millis(200);
provider
.enqueue_for_worker(session_activity("inst-1", 1, "session-1"))
.await
.unwrap();
let (_, token, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
tokio::time::sleep(idle_window + Duration::from_millis(50)).await;
provider
.renew_work_item_lock(&token, Duration::from_secs(30))
.await
.unwrap();
let count = provider
.renew_session_lock(&["worker-A"], Duration::from_secs(30), idle_window)
.await
.unwrap();
assert_eq!(
count, 1,
"Session should be active — work item lock renewal must have updated last_activity_at"
);
}
pub async fn test_session_items_processed_in_order(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
for id in 1..=3 {
provider
.enqueue_for_worker(session_activity("inst-1", id, "session-1"))
.await
.unwrap();
}
for expected_id in 1..=3u64 {
let (item, token, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap_or_else(|| panic!("Should fetch session item {expected_id}"));
if let WorkItem::ActivityExecute { id, .. } = &item {
assert_eq!(*id, expected_id, "Items should be fetched in FIFO order");
}
provider.ack_work_item(&token, None).await.unwrap();
}
}
pub async fn test_non_session_items_returned_with_session_config(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "session-1"))
.await
.unwrap();
let (_, token, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token, None).await.unwrap();
provider.enqueue_for_worker(plain_activity("inst-2", 1)).await.unwrap();
let result = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(
result.is_some(),
"Non-session items should be returned with session config"
);
if let Some((WorkItem::ActivityExecute { session_id, .. }, _, _)) = &result {
assert!(session_id.is_none(), "Should be a non-session item");
}
}
pub async fn test_shared_worker_id_any_caller_can_fetch_owned_session(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "shared-sess"))
.await
.unwrap();
provider
.enqueue_for_worker(session_activity("inst-1", 2, "shared-sess"))
.await
.unwrap();
let (item1, token1, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("process-1")),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
if let WorkItem::ActivityExecute { id, .. } = &item1 {
assert_eq!(*id, 1);
}
provider.ack_work_item(&token1, None).await.unwrap();
let (item2, token2, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("process-1")),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
if let WorkItem::ActivityExecute { id, .. } = &item2 {
assert_eq!(*id, 2);
}
provider.ack_work_item(&token2, None).await.unwrap();
provider
.enqueue_for_worker(session_activity("inst-1", 3, "shared-sess"))
.await
.unwrap();
let other = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("process-2")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(
other.is_none(),
"Different process should not get items for an owned session"
);
}
pub async fn test_concurrent_session_claim_only_one_wins(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "contested-sess"))
.await
.unwrap();
provider
.enqueue_for_worker(session_activity("inst-1", 2, "contested-sess"))
.await
.unwrap();
let result_a = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(result_a.is_some(), "Worker-A should claim the session");
let result_b = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-B")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(
result_b.is_none(),
"Worker-B should NOT get session items while A holds the session lock"
);
let (_, token_a, _) = result_a.unwrap();
provider.ack_work_item(&token_a, None).await.unwrap();
}
pub async fn test_session_takeover_after_lock_expiry(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "takeover-sess"))
.await
.unwrap();
let cfg_a = session_config_with_lock("worker-A", Duration::from_millis(50));
let (_, token_a, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&cfg_a),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token_a, None).await.unwrap();
provider
.enqueue_for_worker(session_activity("inst-1", 2, "takeover-sess"))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let result_b = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-B")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(
result_b.is_some(),
"Worker-B should take over session after A's lock expired"
);
if let Some((WorkItem::ActivityExecute { session_id, .. }, _, _)) = &result_b {
assert_eq!(session_id.as_deref(), Some("takeover-sess"));
}
}
pub async fn test_cleanup_then_new_item_recreates_session(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "ephemeral-sess"))
.await
.unwrap();
let cfg = session_config_with_lock("worker-A", Duration::from_millis(50));
let (_, token, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&cfg),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token, None).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let cleaned = provider
.cleanup_orphaned_sessions(Duration::from_secs(300))
.await
.unwrap();
assert_eq!(cleaned, 1, "Should clean up the expired session");
provider
.enqueue_for_worker(session_activity("inst-1", 2, "ephemeral-sess"))
.await
.unwrap();
let result = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-B")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(result.is_some(), "Session should be re-creatable after cleanup");
if let Some((WorkItem::ActivityExecute { session_id, id, .. }, _, _)) = &result {
assert_eq!(session_id.as_deref(), Some("ephemeral-sess"));
assert_eq!(*id, 2);
}
}
pub async fn test_abandoned_session_item_retryable(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "retry-sess"))
.await
.unwrap();
let (item1, token1, attempt1) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
assert_eq!(attempt1, 1);
if let WorkItem::ActivityExecute { id, .. } = &item1 {
assert_eq!(*id, 1);
}
provider.abandon_work_item(&token1, None, false).await.unwrap();
let (item2, token2, attempt2) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
assert_eq!(attempt2, 2, "Retry should increment attempt count");
if let WorkItem::ActivityExecute { id, .. } = &item2 {
assert_eq!(*id, 1, "Should be the same item");
}
provider.ack_work_item(&token2, None).await.unwrap();
let result = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(result.is_none(), "No items should remain after successful ack");
}
pub async fn test_abandoned_session_item_ignore_attempt(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "ignore-sess"))
.await
.unwrap();
let (_, token, attempt) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
assert_eq!(attempt, 1);
provider
.abandon_work_item(&token, Some(Duration::from_millis(1)), true)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(10)).await;
let (_, token2, attempt2) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
assert_eq!(attempt2, 1, "ignore_attempt should not inflate attempt count");
provider.ack_work_item(&token2, None).await.unwrap();
}
pub async fn test_renew_session_lock_after_expiry_returns_zero(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "expired-sess"))
.await
.unwrap();
let cfg = session_config_with_lock("worker-A", Duration::from_millis(50));
let (_, token, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&cfg),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token, None).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let count = provider
.renew_session_lock(&["worker-A"], Duration::from_secs(30), Duration::from_secs(300))
.await
.unwrap();
assert_eq!(count, 0, "renew_session_lock should return 0 for expired session");
}
pub async fn test_original_worker_reclaims_expired_session(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "reclaim-sess"))
.await
.unwrap();
let cfg = session_config_with_lock("worker-A", Duration::from_millis(50));
let (_, token1, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&cfg),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token1, None).await.unwrap();
provider
.enqueue_for_worker(session_activity("inst-1", 2, "reclaim-sess"))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let result = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(result.is_some(), "Original worker should re-claim expired session");
if let Some((WorkItem::ActivityExecute { id, session_id, .. }, _, _)) = &result {
assert_eq!(*id, 2);
assert_eq!(session_id.as_deref(), Some("reclaim-sess"));
}
}
pub async fn test_activity_lock_expires_session_lock_valid_same_worker_refetches(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "sticky-sess"))
.await
.unwrap();
let short_work_lock = Duration::from_millis(500);
let cfg = session_config("worker-A"); let (_, _token_a, _) = provider
.fetch_work_item(short_work_lock, Duration::ZERO, Some(&cfg), &TagFilter::default())
.await
.unwrap()
.unwrap();
tokio::time::sleep(short_work_lock + Duration::from_millis(100)).await;
let result_b = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-B")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(
result_b.is_none(),
"Worker-B should not get session-bound item — session still owned by A"
);
let result_a = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(
result_a.is_some(),
"Worker-A should reclaim the redelivered item via session affinity"
);
if let Some((WorkItem::ActivityExecute { id, .. }, token, attempt)) = result_a {
assert_eq!(id, 1, "Should be the same item");
assert_eq!(attempt, 2, "Attempt count should have incremented");
provider.ack_work_item(&token, None).await.unwrap();
}
}
pub async fn test_session_lock_expires_new_owner_gets_redelivery(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "takeover2-sess"))
.await
.unwrap();
let short_session = Duration::from_millis(200);
let long_activity = Duration::from_secs(5);
let cfg_a = session_config_with_lock("worker-A", short_session);
let (_, _token_a, _) = provider
.fetch_work_item(long_activity, Duration::ZERO, Some(&cfg_a), &TagFilter::default())
.await
.unwrap()
.unwrap();
tokio::time::sleep(short_session + Duration::from_millis(100)).await;
provider
.enqueue_for_worker(session_activity("inst-1", 2, "takeover2-sess"))
.await
.unwrap();
let cfg_b = session_config("worker-B");
let result_b = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&cfg_b),
&TagFilter::default(),
)
.await
.unwrap();
assert!(
result_b.is_some(),
"Worker-B should claim the expired session and get item 2"
);
if let Some((WorkItem::ActivityExecute { id, session_id, .. }, token_b, _)) = &result_b {
assert_eq!(*id, 2, "Worker-B should get the NEW item");
assert_eq!(session_id.as_deref(), Some("takeover2-sess"));
provider.ack_work_item(token_b, None).await.unwrap();
}
let _ = provider.abandon_work_item(&_token_a, None, false).await;
let result_b2 = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&cfg_b),
&TagFilter::default(),
)
.await
.unwrap();
assert!(
result_b2.is_some(),
"Worker-B should get redelivered item 1 as new session owner"
);
if let Some((WorkItem::ActivityExecute { id, .. }, token, attempt)) = result_b2 {
assert_eq!(id, 1, "Should be the original item");
assert_eq!(attempt, 2, "Attempt count should have incremented");
provider.ack_work_item(&token, None).await.unwrap();
}
}
pub async fn test_session_lock_expires_same_worker_reacquires(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "reacq-sess"))
.await
.unwrap();
let short_session = Duration::from_millis(200);
let cfg_a = session_config_with_lock("worker-A", short_session);
let (_, token_a, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&cfg_a),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token_a, None).await.unwrap();
tokio::time::sleep(short_session + Duration::from_millis(100)).await;
provider
.enqueue_for_worker(session_activity("inst-1", 2, "reacq-sess"))
.await
.unwrap();
let result_a = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&cfg_a),
&TagFilter::default(),
)
.await
.unwrap();
assert!(result_a.is_some(), "Worker-A should reacquire its own expired session");
if let Some((WorkItem::ActivityExecute { id, session_id, .. }, token, _)) = result_a {
assert_eq!(id, 2);
assert_eq!(session_id.as_deref(), Some("reacq-sess"));
provider.ack_work_item(&token, None).await.unwrap();
}
}
pub async fn test_both_locks_expire_different_worker_claims(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "both-expire-sess"))
.await
.unwrap();
let short_lock = Duration::from_millis(200);
let cfg = session_config_with_lock("worker-A", short_lock);
let (_, _token_a, _) = provider
.fetch_work_item(short_lock, Duration::ZERO, Some(&cfg), &TagFilter::default())
.await
.unwrap()
.unwrap();
tokio::time::sleep(short_lock + Duration::from_millis(200)).await;
let result_b = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-B")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(
result_b.is_some(),
"Worker-B should claim item after both locks expired"
);
if let Some((WorkItem::ActivityExecute { id, session_id, .. }, token, attempt)) = result_b {
assert_eq!(id, 1, "Should be the same item");
assert_eq!(session_id.as_deref(), Some("both-expire-sess"));
assert_eq!(attempt, 2, "Attempt count should have incremented");
provider.ack_work_item(&token, None).await.unwrap();
}
}
pub async fn test_session_lock_expires_activity_lock_valid_ack_succeeds(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
provider
.enqueue_for_worker(session_activity("inst-1", 1, "sess-expires"))
.await
.unwrap();
let short_session_lock = Duration::from_millis(200);
let cfg = session_config_with_lock("worker-A", short_session_lock);
let (_, token, _) = provider
.fetch_work_item(
Duration::from_secs(30),
Duration::ZERO,
Some(&cfg),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
tokio::time::sleep(short_session_lock + Duration::from_millis(100)).await;
let result = provider
.ack_work_item(
&token,
Some(WorkItem::ActivityCompleted {
instance: "inst-1".to_string(),
execution_id: 1,
id: 1,
result: "done".to_string(),
}),
)
.await;
assert!(
result.is_ok(),
"Ack must succeed — activity lock is still valid even though session lock expired"
);
}
pub async fn test_session_lock_renewal_extends_past_original_timeout(factory: &dyn ProviderFactory) {
let provider = factory.create_provider().await;
let session_lock_dur = Duration::from_secs(1);
provider
.enqueue_for_worker(session_activity("inst-1", 1, "renew-ext-sess"))
.await
.unwrap();
let cfg = session_config_with_lock("worker-A", session_lock_dur);
let (_, token, _) = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&cfg),
&TagFilter::default(),
)
.await
.unwrap()
.unwrap();
provider.ack_work_item(&token, None).await.unwrap();
tokio::time::sleep(Duration::from_millis(600)).await;
let renewed = provider
.renew_session_lock(&["worker-A"], session_lock_dur, Duration::from_secs(300))
.await
.unwrap();
assert_eq!(renewed, 1, "Should renew the active session");
tokio::time::sleep(Duration::from_millis(400)).await;
provider
.enqueue_for_worker(session_activity("inst-1", 2, "renew-ext-sess"))
.await
.unwrap();
let result_b = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-B")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(
result_b.is_none(),
"Session should still be owned by A after renewal past original timeout"
);
let result_a = provider
.fetch_work_item(
Duration::from_secs(5),
Duration::ZERO,
Some(&session_config("worker-A")),
&TagFilter::default(),
)
.await
.unwrap();
assert!(
result_a.is_some(),
"Worker-A should still own the session after renewal"
);
}