spring_job/
extractor.rs

1use crate::{JobId, JobScheduler};
2use serde::de::DeserializeOwned;
3use spring::async_trait;
4use spring::config::ConfigRegistry;
5use spring::plugin::ComponentRegistry;
6use spring::{app::App, config::Configurable};
7use std::ops::{Deref, DerefMut};
8
9#[async_trait]
10pub trait FromApp {
11    async fn from_app(job_id: &JobId, scheduler: &JobScheduler, app: &App) -> Self;
12}
13
14pub struct Component<T: Clone>(pub T);
15
16#[async_trait]
17impl<T> FromApp for Component<T>
18where
19    T: Clone + Send + Sync + 'static,
20{
21    async fn from_app(_job_id: &JobId, _scheduler: &JobScheduler, app: &App) -> Self {
22        match app.get_component_ref::<T>() {
23            Some(component) => Component(T::clone(&component)),
24            None => panic!(
25                "There is no component of `{}` type",
26                std::any::type_name::<T>()
27            ),
28        }
29    }
30}
31
32impl<T: Clone> Deref for Component<T> {
33    type Target = T;
34
35    fn deref(&self) -> &Self::Target {
36        &self.0
37    }
38}
39
40impl<T: Clone> DerefMut for Component<T> {
41    fn deref_mut(&mut self) -> &mut Self::Target {
42        &mut self.0
43    }
44}
45
46#[async_trait]
47impl FromApp for JobId {
48    async fn from_app(job_id: &JobId, _scheduler: &JobScheduler, _app: &App) -> Self {
49        *job_id
50    }
51}
52
53#[async_trait]
54impl FromApp for JobScheduler {
55    async fn from_app(_job_id: &JobId, scheduler: &JobScheduler, _app: &App) -> Self {
56        scheduler.clone()
57    }
58}
59
60pub struct Data<T: DeserializeOwned>(pub T);
61
62#[async_trait]
63impl<T: DeserializeOwned> FromApp for Data<T> {
64    async fn from_app(job_id: &JobId, scheduler: &JobScheduler, _app: &App) -> Self {
65        let mut guard = scheduler.context.metadata_storage.write().await;
66        let job = guard.get(*job_id).await.expect("job get failed");
67        let data =
68            job.map(|j| serde_json::from_slice(&j.extra).expect("job extra parse to json failed"));
69        Self(data.expect("job extra parse is empty"))
70    }
71}
72
73pub struct Config<T>(pub T)
74where
75    T: serde::de::DeserializeOwned + Configurable;
76
77#[async_trait]
78impl<T> FromApp for Config<T>
79where
80    T: serde::de::DeserializeOwned + Configurable,
81{
82    async fn from_app(_job_id: &JobId, _scheduler: &JobScheduler, app: &App) -> Self {
83        match app.get_config::<T>() {
84            Ok(config) => Config(config),
85            Err(e) => panic!(
86                "get config for typeof {} failed: {}",
87                std::any::type_name::<T>(),
88                e
89            ),
90        }
91    }
92}
93
94impl<T> Deref for Config<T>
95where
96    T: serde::de::DeserializeOwned + Configurable,
97{
98    type Target = T;
99
100    fn deref(&self) -> &Self::Target {
101        &self.0
102    }
103}