ratchjob/schedule/
batch_call.rs

1use crate::job::model::actor_model::JobManagerRaftReq;
2use crate::openapi::xxljob::model::server_request::CallbackParam;
3use crate::raft::cluster::route::RaftRequestRoute;
4use crate::raft::store::ClientRequest;
5use crate::schedule::model::actor_model::ScheduleManagerRaftReq;
6use crate::task::model::task::{JobTaskInfo, TaskCallBackParam};
7use actix::prelude::*;
8use bean_factory::{bean, BeanFactory, FactoryData, Inject};
9use futures_util::task::SpawnExt;
10use std::sync::Arc;
11use tokio::sync::oneshot::Sender;
12
13#[derive(Debug, Default)]
14pub struct CallbackGroup {
15    pub params: Vec<TaskCallBackParam>,
16    pub senders: Vec<Sender<bool>>,
17}
18
19impl CallbackGroup {
20    pub fn new() -> Self {
21        Self {
22            params: vec![],
23            senders: vec![],
24        }
25    }
26
27    pub fn is_empty(&self) -> bool {
28        self.senders.is_empty()
29    }
30}
31
32#[bean(inject)]
33pub struct BatchCallManager {
34    raft_request_route: Option<Arc<RaftRequestRoute>>,
35    callback_cache: Option<CallbackGroup>,
36    task_cache: Option<Vec<Arc<JobTaskInfo>>>,
37    callback_batch_max_count: usize,
38}
39
40impl BatchCallManager {
41    pub fn new() -> Self {
42        Self {
43            raft_request_route: None,
44            callback_cache: Some(CallbackGroup::new()),
45            task_cache: Some(vec![]),
46            callback_batch_max_count: 500,
47        }
48    }
49
50    async fn do_callback(
51        params: CallbackGroup,
52        raft_request_route: Option<Arc<RaftRequestRoute>>,
53    ) -> anyhow::Result<()> {
54        let mut result = false;
55        if let Some(raft_request_route) = raft_request_route {
56            if let Ok(_) = raft_request_route
57                .request(ClientRequest::ScheduleReq {
58                    req: ScheduleManagerRaftReq::TaskCallBacks(params.params),
59                })
60                .await
61            {
62                result = true;
63            }
64        }
65        for sender in params.senders {
66            sender.send(result).ok();
67        }
68        Ok(())
69    }
70
71    fn callback_cache_is_empty(&self) -> bool {
72        if let Some(callback_cache) = self.callback_cache.as_ref() {
73            callback_cache.is_empty()
74        } else {
75            true
76        }
77    }
78
79    fn task_cache_is_empty(&self) -> bool {
80        if let Some(task_cache) = self.task_cache.as_ref() {
81            task_cache.is_empty()
82        } else {
83            true
84        }
85    }
86
87    fn callback(&mut self, ctx: &mut Context<Self>) {
88        if !self.callback_cache_is_empty() {
89            let old_group = self.callback_cache.replace(CallbackGroup::new());
90            if old_group.is_none() {
91                return;
92            }
93            let group = old_group.unwrap();
94            let raft_request_route = self.raft_request_route.clone();
95            BatchCallManager::do_callback(group, raft_request_route)
96                .into_actor(self)
97                .map(|_res, _act, _ctx| {})
98                .spawn(ctx);
99        }
100    }
101
102    fn update_tasks(&mut self, ctx: &mut Context<Self>) {
103        if !self.task_cache_is_empty() {
104            let old_group = self.task_cache.replace(vec![]);
105            if old_group.is_none() {
106                return;
107            }
108            let group = old_group.unwrap();
109            let raft_request_route = self.raft_request_route.clone();
110            BatchCallManager::notify_update_task(raft_request_route, group)
111                .into_actor(self)
112                .map(|_res, _act, _ctx| {})
113                .spawn(ctx);
114        }
115    }
116
117    fn heartbeat(&mut self, ctx: &mut Context<Self>) {
118        ctx.run_later(std::time::Duration::from_millis(450), move |act, ctx| {
119            act.update_tasks(ctx);
120            act.callback(ctx);
121            act.heartbeat(ctx);
122        });
123    }
124
125    async fn notify_update_task(
126        raft_request_route: Option<Arc<RaftRequestRoute>>,
127        tasks: Vec<Arc<JobTaskInfo>>,
128    ) -> anyhow::Result<()> {
129        if tasks.is_empty() {
130            return Ok(());
131        }
132        if let Some(raft_request_route) = raft_request_route {
133            raft_request_route
134                .request(ClientRequest::JobReq {
135                    req: JobManagerRaftReq::UpdateTaskList(tasks),
136                })
137                .await?;
138        }
139        Ok(())
140    }
141}
142
143impl Actor for BatchCallManager {
144    type Context = Context<Self>;
145    fn started(&mut self, _ctx: &mut Self::Context) {
146        log::info!("BatchCallManager started")
147    }
148}
149
150impl Inject for BatchCallManager {
151    type Context = Context<Self>;
152
153    fn inject(
154        &mut self,
155        factory_data: FactoryData,
156        _factory: BeanFactory,
157        ctx: &mut Self::Context,
158    ) {
159        self.raft_request_route = factory_data.get_bean();
160        self.heartbeat(ctx);
161    }
162}
163
164#[derive(Debug, Message)]
165#[rtype(result = "anyhow::Result<()>")]
166pub enum BatchCallManagerReq {
167    Callback(Vec<CallbackParam>),
168}
169
170#[derive(Debug, Message)]
171#[rtype(result = "anyhow::Result<()>")]
172pub enum BatchUpdateTaskManagerReq {
173    UpdateTask(Arc<JobTaskInfo>),
174}
175
176impl Handler<BatchCallManagerReq> for BatchCallManager {
177    type Result = ResponseActFuture<Self, anyhow::Result<()>>;
178
179    fn handle(&mut self, msg: BatchCallManagerReq, ctx: &mut Self::Context) -> Self::Result {
180        let mut count = 0;
181        let rx = match msg {
182            BatchCallManagerReq::Callback(params) => {
183                let (tx, rx) = tokio::sync::oneshot::channel();
184                if let Some(callback_cache) = self.callback_cache.as_mut() {
185                    for param in params {
186                        callback_cache.params.push(param.into());
187                    }
188                    callback_cache.senders.push(tx);
189                    count = callback_cache.senders.len();
190                }
191                rx
192            }
193        };
194        let fut = async move {
195            if rx.await? {
196                Ok(())
197            } else {
198                Err(anyhow::anyhow!("callback error"))
199            }
200        }
201        .into_actor(self)
202        .map(|res: anyhow::Result<()>, _act, _ctx| res);
203        if count >= self.callback_batch_max_count {
204            //更新外部反馈前,先更新内部反馈
205            self.update_tasks(ctx);
206            self.callback(ctx);
207        }
208        Box::pin(fut)
209    }
210}
211
212impl Handler<BatchUpdateTaskManagerReq> for BatchCallManager {
213    type Result = anyhow::Result<()>;
214
215    fn handle(&mut self, msg: BatchUpdateTaskManagerReq, ctx: &mut Self::Context) -> Self::Result {
216        let trigger_batch_max_count = self.callback_batch_max_count;
217        match msg {
218            BatchUpdateTaskManagerReq::UpdateTask(task) => {
219                if let Some(task_cache) = self.task_cache.as_mut() {
220                    task_cache.push(task);
221                    if task_cache.len() >= trigger_batch_max_count {
222                        self.update_tasks(ctx);
223                    }
224                }
225            }
226        }
227        Ok(())
228    }
229}