ratchjob/schedule/
batch_call.rs1use crate::job::model::actor_model::JobManagerRaftReq;
2use crate::openapi::xxljob::model::server_request::CallbackParam;
3use crate::raft::cluster::route::RaftRequestRoute;
4use crate::raft::store::ClientRequest;
5use crate::schedule::model::actor_model::ScheduleManagerRaftReq;
6use crate::task::model::task::{JobTaskInfo, TaskCallBackParam};
7use actix::prelude::*;
8use bean_factory::{bean, BeanFactory, FactoryData, Inject};
9use futures_util::task::SpawnExt;
10use std::sync::Arc;
11use tokio::sync::oneshot::Sender;
12
13#[derive(Debug, Default)]
14pub struct CallbackGroup {
15 pub params: Vec<TaskCallBackParam>,
16 pub senders: Vec<Sender<bool>>,
17}
18
19impl CallbackGroup {
20 pub fn new() -> Self {
21 Self {
22 params: vec![],
23 senders: vec![],
24 }
25 }
26
27 pub fn is_empty(&self) -> bool {
28 self.senders.is_empty()
29 }
30}
31
32#[bean(inject)]
33pub struct BatchCallManager {
34 raft_request_route: Option<Arc<RaftRequestRoute>>,
35 callback_cache: Option<CallbackGroup>,
36 task_cache: Option<Vec<Arc<JobTaskInfo>>>,
37 callback_batch_max_count: usize,
38}
39
40impl BatchCallManager {
41 pub fn new() -> Self {
42 Self {
43 raft_request_route: None,
44 callback_cache: Some(CallbackGroup::new()),
45 task_cache: Some(vec![]),
46 callback_batch_max_count: 500,
47 }
48 }
49
50 async fn do_callback(
51 params: CallbackGroup,
52 raft_request_route: Option<Arc<RaftRequestRoute>>,
53 ) -> anyhow::Result<()> {
54 let mut result = false;
55 if let Some(raft_request_route) = raft_request_route {
56 if let Ok(_) = raft_request_route
57 .request(ClientRequest::ScheduleReq {
58 req: ScheduleManagerRaftReq::TaskCallBacks(params.params),
59 })
60 .await
61 {
62 result = true;
63 }
64 }
65 for sender in params.senders {
66 sender.send(result).ok();
67 }
68 Ok(())
69 }
70
71 fn callback_cache_is_empty(&self) -> bool {
72 if let Some(callback_cache) = self.callback_cache.as_ref() {
73 callback_cache.is_empty()
74 } else {
75 true
76 }
77 }
78
79 fn task_cache_is_empty(&self) -> bool {
80 if let Some(task_cache) = self.task_cache.as_ref() {
81 task_cache.is_empty()
82 } else {
83 true
84 }
85 }
86
87 fn callback(&mut self, ctx: &mut Context<Self>) {
88 if !self.callback_cache_is_empty() {
89 let old_group = self.callback_cache.replace(CallbackGroup::new());
90 if old_group.is_none() {
91 return;
92 }
93 let group = old_group.unwrap();
94 let raft_request_route = self.raft_request_route.clone();
95 BatchCallManager::do_callback(group, raft_request_route)
96 .into_actor(self)
97 .map(|_res, _act, _ctx| {})
98 .spawn(ctx);
99 }
100 }
101
102 fn update_tasks(&mut self, ctx: &mut Context<Self>) {
103 if !self.task_cache_is_empty() {
104 let old_group = self.task_cache.replace(vec![]);
105 if old_group.is_none() {
106 return;
107 }
108 let group = old_group.unwrap();
109 let raft_request_route = self.raft_request_route.clone();
110 BatchCallManager::notify_update_task(raft_request_route, group)
111 .into_actor(self)
112 .map(|_res, _act, _ctx| {})
113 .spawn(ctx);
114 }
115 }
116
117 fn heartbeat(&mut self, ctx: &mut Context<Self>) {
118 ctx.run_later(std::time::Duration::from_millis(450), move |act, ctx| {
119 act.update_tasks(ctx);
120 act.callback(ctx);
121 act.heartbeat(ctx);
122 });
123 }
124
125 async fn notify_update_task(
126 raft_request_route: Option<Arc<RaftRequestRoute>>,
127 tasks: Vec<Arc<JobTaskInfo>>,
128 ) -> anyhow::Result<()> {
129 if tasks.is_empty() {
130 return Ok(());
131 }
132 if let Some(raft_request_route) = raft_request_route {
133 raft_request_route
134 .request(ClientRequest::JobReq {
135 req: JobManagerRaftReq::UpdateTaskList(tasks),
136 })
137 .await?;
138 }
139 Ok(())
140 }
141}
142
143impl Actor for BatchCallManager {
144 type Context = Context<Self>;
145 fn started(&mut self, _ctx: &mut Self::Context) {
146 log::info!("BatchCallManager started")
147 }
148}
149
150impl Inject for BatchCallManager {
151 type Context = Context<Self>;
152
153 fn inject(
154 &mut self,
155 factory_data: FactoryData,
156 _factory: BeanFactory,
157 ctx: &mut Self::Context,
158 ) {
159 self.raft_request_route = factory_data.get_bean();
160 self.heartbeat(ctx);
161 }
162}
163
164#[derive(Debug, Message)]
165#[rtype(result = "anyhow::Result<()>")]
166pub enum BatchCallManagerReq {
167 Callback(Vec<CallbackParam>),
168}
169
170#[derive(Debug, Message)]
171#[rtype(result = "anyhow::Result<()>")]
172pub enum BatchUpdateTaskManagerReq {
173 UpdateTask(Arc<JobTaskInfo>),
174}
175
176impl Handler<BatchCallManagerReq> for BatchCallManager {
177 type Result = ResponseActFuture<Self, anyhow::Result<()>>;
178
179 fn handle(&mut self, msg: BatchCallManagerReq, ctx: &mut Self::Context) -> Self::Result {
180 let mut count = 0;
181 let rx = match msg {
182 BatchCallManagerReq::Callback(params) => {
183 let (tx, rx) = tokio::sync::oneshot::channel();
184 if let Some(callback_cache) = self.callback_cache.as_mut() {
185 for param in params {
186 callback_cache.params.push(param.into());
187 }
188 callback_cache.senders.push(tx);
189 count = callback_cache.senders.len();
190 }
191 rx
192 }
193 };
194 let fut = async move {
195 if rx.await? {
196 Ok(())
197 } else {
198 Err(anyhow::anyhow!("callback error"))
199 }
200 }
201 .into_actor(self)
202 .map(|res: anyhow::Result<()>, _act, _ctx| res);
203 if count >= self.callback_batch_max_count {
204 self.update_tasks(ctx);
206 self.callback(ctx);
207 }
208 Box::pin(fut)
209 }
210}
211
212impl Handler<BatchUpdateTaskManagerReq> for BatchCallManager {
213 type Result = anyhow::Result<()>;
214
215 fn handle(&mut self, msg: BatchUpdateTaskManagerReq, ctx: &mut Self::Context) -> Self::Result {
216 let trigger_batch_max_count = self.callback_batch_max_count;
217 match msg {
218 BatchUpdateTaskManagerReq::UpdateTask(task) => {
219 if let Some(task_cache) = self.task_cache.as_mut() {
220 task_cache.push(task);
221 if task_cache.len() >= trigger_batch_max_count {
222 self.update_tasks(ctx);
223 }
224 }
225 }
226 }
227 Ok(())
228 }
229}