yq_async/
async_job.rs

1use async_trait::async_trait;
2use std::collections::HashMap;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use yq::{decode_job, Job, JobType, YqError, YqResult, YqRunJobError};
7
8#[async_trait]
9pub trait AsyncJob: Job + 'static + Send {
10    async fn execute_async(self, mid: i64, state: Self::State) -> Result<(), String>;
11}
12
13type AsyncJobFn<S> = Arc<
14    dyn Fn(i64, String, S) -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>>
15        + Send
16        + Sync,
17>;
18
19pub(crate) struct AsyncJobFns<S>(HashMap<JobType, AsyncJobFn<S>>);
20
21impl<S> AsyncJobFns<S> {
22    pub(crate) fn new() -> AsyncJobFns<S> {
23        AsyncJobFns::<S>(HashMap::default())
24    }
25
26    pub(crate) fn reg_job(&mut self, job_type: JobType, job_fn: AsyncJobFn<S>) -> YqResult<()> {
27        if self.0.insert(job_type.clone(), job_fn).is_some() {
28            Err(YqError::DupJobType(job_type))
29        } else {
30            Ok(())
31        }
32    }
33
34    pub(crate) async fn handle(&self, mid: i64, mcontent: String, state: S) -> YqResult<()> {
35        let (job_type, job_data) = decode_job(&mcontent)?;
36
37        let job_fn = match self.0.get(job_type) {
38            Some(job_fn) => job_fn,
39            None => {
40                return Err(YqError::JobTypeMissing(JobType::from(job_type.to_string())));
41            }
42        };
43
44        job_fn(mid, job_data.to_string(), state)
45            .await
46            .map_err(|error| YqError::RunJobError(YqRunJobError::new(job_data.to_string(), error)))
47    }
48}