mod common;
#[cfg(test)]
mod test {
use mongodb::bson::{doc, DateTime};
use mscheduler::tasker::producer::{SendTaskOption, TaskProducer};
use mscheduler::tasker::task_common::ensure_index;
use crate::common::test::init_collection_for_test;
#[tokio::test]
async fn test_send_new_task() {
let collection = init_collection_for_test("test_send_new_task").await;
collection
.delete_many(doc! {})
.await
.expect("failed to clear collection");
ensure_index(&collection).await;
let task_producer = TaskProducer::<_, i32>::create(collection.clone())
.expect("failed to generate producer");
let random = DateTime::now().timestamp_millis() % 1000;
let key = "111";
let send_task_result = task_producer
.send_task(key, random as i32, None)
.await
.expect("failed to send new task");
let task = collection
.find_one(doc! {"key":key})
.await
.expect("failed to find new task in db")
.expect("no task returns");
assert!(task.params.is_some());
assert_eq!(task.params.unwrap(), random as i32);
assert_eq!(task.key, key);
assert!(!send_task_result.update_existing);
assert!(send_task_result.insert_new);
}
#[tokio::test]
async fn test_send_duplicate_task() {
let collection = init_collection_for_test("test_send_duplicate_task").await;
collection
.delete_many(doc! {})
.await
.expect("failed to clear collection");
ensure_index(&collection).await;
let task_producer = TaskProducer::<_, i32>::create(collection.clone())
.expect("failed to generate producer");
let random = (DateTime::now().timestamp_millis() % 1000) as i32;
let key = "111";
let send_task_result = task_producer
.send_task(key, random, None)
.await
.expect("failed to send new task");
let task = collection
.find_one(doc! {"key":key})
.await
.expect("failed to find new task in db")
.expect("no task returns");
assert!(task.params.is_some());
assert_eq!(task.params.unwrap(), random);
assert_eq!(task.key, key);
assert!(!send_task_result.update_existing);
assert!(send_task_result.insert_new);
task_producer
.send_task(key, random + 1, None)
.await
.expect("failed to send new task");
let task = collection
.find_one(doc! {"key":key})
.await
.expect("failed to find new task in db")
.expect("no task returns");
assert!(task.params.is_some());
assert_eq!(task.params.unwrap(), random);
assert_eq!(task.key, key);
let send_task_option = SendTaskOption::builder()
.update_existing_params(true)
.build();
task_producer
.send_task(key, random + 1, Some(send_task_option))
.await
.expect("failed to send new task");
let task = collection
.find_one(doc! {"key":key})
.await
.expect("failed to find new task in db")
.expect("no task returns");
assert!(task.params.is_some());
assert_eq!(task.params.unwrap(), random + 1);
assert_eq!(task.key, key);
let run_time = DateTime::parse_rfc3339_str("2030-04-12T23:20:50.52Z").unwrap();
let send_task_option = SendTaskOption::builder().run_time(Some(run_time)).build();
let send_task_result = task_producer
.send_task(key, random + 1, Some(send_task_option))
.await
.expect("failed to send new task");
assert!(send_task_result.update_existing);
assert!(!send_task_result.insert_new);
let task = collection
.find_one(doc! {"key":key})
.await
.expect("failed to find new task in db")
.expect("no task returns");
assert!(task.params.is_some());
assert_eq!(task.task_state.start_time, run_time);
assert_eq!(task.key, key);
}
}