use mongodb::bson::{doc, to_bson, to_document, DateTime};
use mongodb::error::{ErrorKind, WriteFailure};
use mongodb::options::UpdateOptions;
use mongodb::Collection;
use serde::Serialize;
use tracing::{error, instrument, trace};
use typed_builder::TypedBuilder;
use crate::tasker::error::{MResult, MSchedulerError};
use crate::tasker::task::{Task, TaskOption};
pub struct TaskProducer<T: Send + Sync, K: Send + Sync> {
task_collection: Collection<Task<T, K>>,
}
#[derive(Debug, Clone, TypedBuilder)]
#[builder(field_defaults(default, setter(into)))]
#[non_exhaustive]
pub struct SendTaskOption {
#[builder(default = false)]
pub update_existing_params: bool,
#[builder(default = None)]
pub run_time: Option<DateTime>,
#[builder(default = 1)]
pub concurrency_cnt: u32,
#[builder(default = false)]
pub not_update_running: bool,
#[builder(default = 30_000)]
pub ping_interval_ms: u32,
#[builder(default = vec![])]
pub specific_worker_ids: Vec<String>,
#[builder(default = 60_000)]
worker_timeout_ms: u32,
}
pub struct SendTaskResult {
pub insert_new: bool,
pub update_existing: bool,
}
impl<T: Serialize + Send + Sync, K: Serialize + Send + Sync> TaskProducer<T, K> {
pub fn create(collection: Collection<Task<T, K>>) -> MResult<TaskProducer<T, K>> {
Ok(TaskProducer {
task_collection: collection,
})
}
#[instrument(skip_all, fields(key=%key.as_ref()))]
pub async fn send_task(
&self,
key: impl AsRef<str>,
params: T,
option: Option<SendTaskOption>,
) -> MResult<SendTaskResult> {
let send_option = option.unwrap_or_else(|| SendTaskOption::builder().build());
let mut query = doc! { "key": key.as_ref()};
if send_option.not_update_running {
let doc = doc! {
"$all":[{
"$or":[
{"success_time":{"$eq": null}},
{"fail_time":{"$eq": null}},
]
}]
};
query.insert("task_state.worker_states", doc);
}
let now = DateTime::now();
let start_time = send_option.run_time.clone().unwrap_or(now);
let task_option = TaskOption {
priority: 0,
concurrent_worker_cnt: send_option.concurrency_cnt,
ping_interval_ms: send_option.ping_interval_ms,
worker_timeout_ms: send_option.worker_timeout_ms,
specific_worker_ids: send_option.specific_worker_ids,
max_unexpected_retries: 3,
unexpected_retry_delay_ms: 10_000,
};
let mut update_part = doc! {
"$setOnInsert": doc! {
"key":key.as_ref(),
"task_state.create_time":now,
"task_state.start_time":start_time,
"task_state.worker_states":[],
"task_option":to_document(&task_option).expect("cannot convert to task_option"),
},
};
let mut updates = vec![];
if send_option.update_existing_params {
updates.push(("params", to_bson(¶ms).unwrap()));
} else {
let set_on_insert_part = update_part.get_mut("$setOnInsert").unwrap();
let set_on_insert_doc = set_on_insert_part.as_document_mut().unwrap();
set_on_insert_doc.insert("params", to_bson(¶ms).unwrap());
}
if let Some(_) = send_option.run_time {
let set_on_insert_part = update_part.get_mut("$setOnInsert").unwrap();
let set_on_insert_doc = set_on_insert_part.as_document_mut().unwrap();
let task_state_part = set_on_insert_doc.remove("task_state.start_time").unwrap();
updates.push(("task_state.start_time", task_state_part));
}
if !updates.is_empty() {
let mut document = doc! {};
for update in updates {
document.insert(update.0, update.1);
}
update_part.insert("$set", document);
}
match self
.task_collection
.update_one(query, update_part)
.with_options(UpdateOptions::builder().upsert(true).build())
.await
{
Ok(v) => {
if v.upserted_id.is_some() {
trace!("send task and new task is created key={}", key.as_ref());
Ok(SendTaskResult {
insert_new: true,
update_existing: false,
})
} else if v.matched_count == 1 {
trace!("send task ignored key={}", key.as_ref());
Ok(SendTaskResult {
insert_new: false,
update_existing: true,
})
} else if v.matched_count == 0 {
trace!("send task is ignored key={}", key.as_ref());
Err(MSchedulerError::NoTaskMatched)
} else {
error!("send task is failed key={}", key.as_ref());
Err(MSchedulerError::AddTaskFailed)
}
}
Err(e) => match e.kind.as_ref() {
ErrorKind::Write(WriteFailure::WriteError(write_error)) => {
if write_error.code == 11000 {
trace!("task inserted failed, duplicated key");
Err(MSchedulerError::DuplicatedTaskId)
} else {
error!("failed to send task {}", e);
Err(MSchedulerError::MongoDbError(e.into()))
}
}
_ => {
error!("unknown mongodb error occurred during insert {:?}", &e);
Err(MSchedulerError::MongoDbError(e.into()))
}
},
}
}
}