1use async_trait::async_trait;
2use std::collections::HashMap;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use yq::{decode_job, Job, JobType, YqError, YqResult, YqRunJobError};
7
8#[async_trait]
9pub trait AsyncJob: Job + 'static + Send {
10 async fn execute_async(self, mid: i64, state: Self::State) -> Result<(), String>;
11}
12
13type AsyncJobFn<S> = Arc<
14 dyn Fn(i64, String, S) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>>
15 + Send
16 + Sync,
17>;
18
19pub(crate) struct AsyncJobFns<S>(HashMap<JobType, AsyncJobFn<S>>);
20
21impl<S> AsyncJobFns<S> {
22 pub(crate) fn new() -> AsyncJobFns<S> {
23 AsyncJobFns::<S>(HashMap::default())
24 }
25
26 pub(crate) fn reg_job(&mut self, job_type: JobType, job_fn: AsyncJobFn<S>) -> YqResult<()> {
27 if self.0.insert(job_type.clone(), job_fn).is_some() {
28 Err(YqError::DupJobType(job_type))
29 } else {
30 Ok(())
31 }
32 }
33
34 pub(crate) async fn handle(&self, mid: i64, mcontent: String, state: S) -> YqResult<()> {
35 let (job_type, job_data) = decode_job(&mcontent)?;
36
37 let job_fn = match self.0.get(job_type) {
38 Some(job_fn) => job_fn,
39 None => {
40 return Err(YqError::JobTypeMissing(JobType::from(job_type.to_string())));
41 }
42 };
43
44 job_fn(mid, job_data.to_string(), state)
45 .await
46 .map_err(|error| YqError::RunJobError(YqRunJobError::new(job_data.to_string(), error)))
47 }
48}