mod common;
#[cfg(test)]
mod test {
use std::time::Duration;
use tokio::join;
use tracing::info;
use tracing::trace;
use mscheduler::tasker::consumer::{ConsumerEvent, TaskConsumer, TaskConsumerConfig};
use mscheduler::tasker::producer::{SendTaskOption, TaskProducer};
use mscheduler::tasker::task::Task;
use crate::common::test::{init_collection_for_test, spawn_check_handler, spawn_running_consumer_handler};
use crate::common::test_consume_func::{TestConsumeFunc, TestConsumeParam};
#[test_log::test(tokio::test)]
pub async fn test_occupy_task() {
let collection_name = "test_occupy_task";
let worker_id1 = "aaa";
let task_consumer = TaskConsumer::create(init_collection_for_test(collection_name).await, TestConsumeFunc {}, TaskConsumerConfig::builder().worker_id(worker_id1).build()).await.expect("failed to create consumer");
let task_producer = TaskProducer::create(init_collection_for_test::<Task<TestConsumeParam, i32>>(collection_name).await).expect("failed to create producer");
spawn_running_consumer_handler(task_consumer.clone());
let occupy_handle = spawn_check_handler(task_consumer.clone(), |event| {
match event {
ConsumerEvent::TaskOccupyResult { key, success } => {
key == "111" && *success
}
_ => {
false
}
}
}, Duration::from_secs(5));
task_producer.send_task("111", TestConsumeParam::default(), None).await.expect("failed to send task");
let occupy_result = occupy_handle.await;
info!("wait occupy completed");
assert!(occupy_result.expect("failed to wait 4 event").is_some());
}
#[test_log::test(tokio::test)]
pub async fn test_concurrent_occupy_task() {
let collection_name = "test_concurrent_occupy_task";
let worker_id1 = "aaa";
let worker_id2 = "bbb";
let worker_id3 = "ccc";
let task_consumer = TaskConsumer::create(init_collection_for_test::<Task<TestConsumeParam, i32>>(collection_name).await, TestConsumeFunc {}, TaskConsumerConfig::builder().worker_id(worker_id1).build()).await.expect("failed to create consumer");
let task_consumer2 = TaskConsumer::create(init_collection_for_test::<Task<TestConsumeParam, i32>>(collection_name).await, TestConsumeFunc {}, TaskConsumerConfig::builder().worker_id(worker_id2).build()).await.expect("failed to create consumer2");
let task_consumer3 = TaskConsumer::create(init_collection_for_test::<Task<TestConsumeParam, i32>>(collection_name).await, TestConsumeFunc {}, TaskConsumerConfig::builder().worker_id(worker_id3).build()).await.expect("failed to create consumer3");
let task_producer = TaskProducer::create(init_collection_for_test::<Task<TestConsumeParam, i32>>(collection_name).await).expect("failed to create producer");
spawn_running_consumer_handler(task_consumer.clone());
spawn_running_consumer_handler(task_consumer2.clone());
spawn_running_consumer_handler(task_consumer3.clone());
let occupy_handle = spawn_check_handler(task_consumer.clone(), |event| {
match event {
ConsumerEvent::TaskOccupyResult { key, success } => {
key == "111" && *success
}
_ => {
false
}
}
}, Duration::from_secs(5));
let occupy2_handle = spawn_check_handler(task_consumer2.clone(), |event| {
match event {
ConsumerEvent::TaskOccupyResult { key, success } => {
key == "111" && *success
}
_ => {
false
}
}
}, Duration::from_secs(5));
let occupy3_handle = spawn_check_handler(task_consumer3.clone(), |event| {
match event {
ConsumerEvent::TaskOccupyResult { key, success } => {
key == "111" && *success
}
_ => {
false
}
}
}, Duration::from_secs(5));
task_producer.send_task("111", TestConsumeParam::default(), Some(SendTaskOption::builder().concurrency_cnt(2_u32).build())).await.expect("failed to send task");
let (occupy_result, occupy_result2, occupy_result3) = join!(occupy_handle, occupy2_handle, occupy3_handle);
info!("wait occupy completed");
let success_cnt = [occupy_result.expect("failed to wait").is_some(), occupy_result2.expect("failed to wait").is_some(), occupy_result3.expect("failed to wait").is_some()].iter()
.filter(|&&is_success| is_success)
.count();
assert_eq!(success_cnt, 2, "success cnt should be 2");
}
#[test_log::test(tokio::test)]
pub async fn test_occupy_existing_task() {
let collection_name = "test_occupy_existing_task";
let collection1 = init_collection_for_test::<Task<TestConsumeParam, i32>>(collection_name).await;
let collection2 = init_collection_for_test::<Task<TestConsumeParam, i32>>(collection_name).await;
let task_producer = TaskProducer::create(collection1).expect("failed to create producer");
let send_task_option = SendTaskOption::builder().concurrency_cnt(1_u32).worker_timeout_ms(3000_u32).build();
task_producer.send_task("111", TestConsumeParam::builder().timeout_sec(2_u32).emit_error(true).build(), Some(send_task_option)).await.expect("failed to send task");
let worker_id1 = "aaa";
let config = TaskConsumerConfig::builder().worker_id(worker_id1).build();
let task_consumer = TaskConsumer::create(collection2, TestConsumeFunc {}, config).await.expect("failed to create consumer");
let occupy_handle = spawn_check_handler(task_consumer.clone(), |event| {
match event {
ConsumerEvent::TaskOccupyResult { key, success } => {
key == "111" && *success
}
_ => {
false
}
}
}, Duration::from_secs(5));
spawn_running_consumer_handler(task_consumer.clone());
assert!(occupy_handle.await.expect("failed to wait").is_some(), "cannot find success event");
}
#[test_log::test(tokio::test)]
pub async fn test_occupy_expired_task() {
let collection_name = "test_occupy_expired_task";
let collection1 = init_collection_for_test::<Task<TestConsumeParam, i32>>(collection_name).await;
let collection2 = init_collection_for_test::<Task<TestConsumeParam, i32>>(collection_name).await;
let collection3 = init_collection_for_test::<Task<TestConsumeParam, i32>>(collection_name).await;
{
let task_producer = TaskProducer::create(collection1).expect("failed to create producer");
let send_task_option = SendTaskOption::builder().concurrency_cnt(1_u32).ping_interval_ms(1_u32).worker_timeout_ms(5_u32).build();
task_producer.send_task("111", TestConsumeParam::builder().timeout_sec(100_u32).emit_error(true).build(), Some(send_task_option)).await.expect("failed to send task");
}
let worker_id1 = "aaa";
let task_consumer1 = TaskConsumer::create(collection2, TestConsumeFunc {}, TaskConsumerConfig::builder().worker_id(worker_id1).build().clone()).await.expect("failed to create consumer");
let occupy_handle = spawn_check_handler(task_consumer1.clone(), |event| {
match event {
ConsumerEvent::TaskOccupyResult { key, success } => {
key == "111" && *success
}
_ => {
false
}
}
}, Duration::from_secs(5));
let consumer_handler = spawn_running_consumer_handler(task_consumer1.clone());
occupy_handle.await.expect("failed to occupy task").expect("task not occupied");
drop(consumer_handler);
tokio::time::sleep(Duration::from_secs(7)).await;
let worker_id2 = "bbb";
let task_consumer2 = TaskConsumer::create(collection3, TestConsumeFunc {}, TaskConsumerConfig::builder().worker_id(worker_id2).build()).await.expect("failed to create consumer");
let occupy_handle2 = spawn_check_handler(task_consumer2.clone(), |event| {
trace!("event={}",&event);
match event {
ConsumerEvent::TaskOccupyResult { key, success } => {
key == "111" && *success
}
_ => {
false
}
}
}, Duration::from_secs(7));
spawn_running_consumer_handler(task_consumer2.clone());
occupy_handle2.await.expect("failed to occupy task by 2").expect("task not occupied by 2");
}
}