1use actix::prelude::*;
2use serde;
3use serde_derive::{Deserialize, Serialize};
4use serde_json;
5use serde_json::json;
6use uuid::Uuid;
7
8use crate::{
9 center::send::*,
10 control::message::StopTask,
11 worker::{
12 client::*,
13 controller::{WorkerController},
14 plugin::{WorkerPlugin},
15 task_reader::TaskReader,
16 tracker,
17 worker_message::{WorkerMessage, Dest, WorkerMessagePayload},
18 },
19};
20
21#[derive(Clone)]
22pub enum ControllerAddr {
23 Controller(Addr<WorkerController>),
24 Reader(Addr<TaskReader>),
25 None,
26}
27
28#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
29#[serde(rename_all = "snake_case")]
30pub enum TaskStatus {
31 Unknown,
32 Running,
33 Suspended,
34 FinishedSuccess,
35 FinishedFailure,
36}
37
38pub struct TaskExecutionContext {
39 pub task_uuid: String,
40 pub parent_task_uuid: String,
41 pub stop_task_addr: Recipient<StopTask>,
42 pub controller_addr: ControllerAddr,
43}
44
45impl TaskExecutionContext {
46 pub fn send_worker_message(&self, msg: WorkerMessage) {
47 if let ControllerAddr::Controller(addr) = &self.controller_addr {
48 addr.do_send(msg);
49 }
50 }
51}
52
53pub trait TaskWrapper: Send + Sync {
54 fn execute_in_arbiter(
55 &self,
56 arbiter: &ArbiterHandle,
57 controller_addr: ControllerAddr,
58 ) -> TaskExecutionContext;
59
60 fn uuid(&self) -> &str;
61
62 fn parent_uuid(&self) -> &str;
63
64 fn worker_id(&self) -> &str;
65
66 fn update_worker_id(&mut self, worker_id: String);
67
68 fn update_task_uuid(&mut self);
70
71 fn clone_box(&self) -> Box<dyn TaskWrapper>;
72
73 fn plugin(&self) -> WorkerPlugin;
74
75 fn name(&self) -> &str;
76}
77
78pub trait TaskDefinition {
79
80 fn update_task_uuid(&mut self, task_uuid: String);
81
82 fn update_worker_id(&mut self, task_uuid: String);
83
84 fn parent_task_uuid(&self) -> &str;
85
86 fn plugin(&self) -> WorkerPlugin;
87
88 fn name(&self) -> &str;
89}
90
91#[derive(Clone, Serialize, Deserialize)]
92pub struct GenTaskDefinition<P> {
93
94 pub executor_path: String,
95
96 pub params: P,
98
99 pub task_uuid: String,
100
101 pub name: String,
102
103 pub parent_task_uuid: String,
105
106 pub worker_id: String,
108
109 pub plugin: WorkerPlugin,
111}
112
113impl<P> TaskDefinition for GenTaskDefinition<P> {
114 fn update_task_uuid(&mut self, task_uuid: String) {
115 self.task_uuid = task_uuid;
116 }
117
118 fn update_worker_id(&mut self, worker_id: String) {
119 self.worker_id = worker_id;
120 }
121
122 fn parent_task_uuid(&self) -> &str { &self.parent_task_uuid }
123
124 fn plugin(&self) -> WorkerPlugin { self.plugin }
125
126 fn name(&self) -> &str { &self.name }
127}
128
129impl<P> GenTaskDefinition<P>
130where
131 P: serde::Serialize + Clone
132{
133 pub fn new(
134 plugin: WorkerPlugin,
135 executor_path: &str,
136 params: P,
137 name: &str,
138 ) -> Self {
139 Self {
140 executor_path: executor_path.to_string(),
141 params,
142 task_uuid: String::new(),
143 name: name.to_string(),
144 parent_task_uuid: String::new(),
145 worker_id: String::new(),
146 plugin,
147 }
148 }
149
150 pub fn subtask(
151 plugin: WorkerPlugin,
152 executor_path: &str,
153 params: P,
154 parent_task_uuid: String,
155 name: &str,
156 ) -> Self {
157 Self {
158 executor_path: executor_path.to_string(),
159 params,
160 task_uuid: String::new(),
161 name: name.to_string(),
162 parent_task_uuid,
163 worker_id: String::new(),
164 plugin,
165 }
166 }
167
168 pub fn new_none_plugin(params: P, name: &str) -> Self {
169 Self::new(WorkerPlugin::None, "", params, name)
170 }
171
172 pub fn subtask_none_plugin(
173 params: P,
174 parent_task_uuid: String,
175 name: &str,
176 ) -> Self {
177 Self::subtask(WorkerPlugin::None, "", params, parent_task_uuid, name)
178 }
179
180 pub fn make_message(&self) -> WorkerMessage {
181 let params = serde_json::to_value(self.params.clone()).unwrap();
182 let data = json!({
183 "task": {
184 "executor_path": self.executor_path,
185 "params": params,
186 "name": self.name,
187 }
188 });
189
190 let dest = Dest::Worker;
191
192 let payload = WorkerMessagePayload {
193 dest,
194 worker_id: self.worker_id.clone(),
195 task_uuid: self.task_uuid.clone(),
196 plugin: WorkerPlugin::as_str(self.plugin).to_string(),
197 data,
198 };
199
200 WorkerMessage::new(payload)
201 }
202
203 pub fn make_message_with_data(
204 &self,
205 data: serde_json::Value
206 ) -> WorkerMessage {
207 let dest = Dest::Worker;
208
209 let payload = WorkerMessagePayload {
210 dest,
211 worker_id: self.worker_id.clone(),
212 task_uuid: self.task_uuid.clone(),
213 plugin: WorkerPlugin::as_str(self.plugin).to_string(),
214 data,
215 };
216
217 WorkerMessage::new(payload)
218 }
219}
220
221#[derive(Clone, Default)]
222pub struct WorkerTask<C>
223where
224 C: WorkerClient,
225 C: Actor<Context=Context<C>>,
226{
227 pub task_uuid: String,
228 pub worker_id: String,
229 pub task_definition: C::TaskDefinition,
230}
231
232impl<C: WorkerClient + Send + Sync> WorkerTask<C>
233where
234 C::TaskDefinition: TaskDefinition,
235 C: Actor<Context=Context<C>>,
236{
237 pub fn new(mut task_definition: C::TaskDefinition) -> Self {
238 let task_uuid = Uuid::new_v4().to_string();
239 task_definition.update_task_uuid(task_uuid.clone());
240 Self {
241 task_uuid,
242 worker_id: String::new(),
243 task_definition,
244 }
245 }
246
247 pub fn new_with_uuid(
248 mut task_definition: C::TaskDefinition,
249 task_uuid: String
250 ) -> Self {
251 task_definition.update_task_uuid(task_uuid.clone());
252 Self {
253 task_uuid,
254 worker_id: String::new(),
255 task_definition,
256 }
257 }
258}
259
260impl<C> TaskWrapper for WorkerTask<C>
261where
262 C: WorkerClient,
263 C: Actor<Context=Context<C>>,
264 Self: Send + Sync,
265 C::TaskDefinition: Clone + TaskDefinition + Send + Sync +
266 serde::Serialize,
267{
268 fn execute_in_arbiter(
269 &self,
270 arbiter: &ArbiterHandle,
271 controller_addr: ControllerAddr,
272 ) -> TaskExecutionContext {
273 let controller_addr_clone = controller_addr.clone();
274
275 let parent_task_uuid =
276 self.task_definition.parent_task_uuid().to_string();
277
278 if !parent_task_uuid.is_empty() {
279 tracker::subscribe_no_addr(
280 self.task_uuid.clone(),
281 parent_task_uuid.clone(),
282 self.task_definition.name().into(),
283 false,
284 );
285 }
286
287 let client_ctx = ClientContext {
288 task_uuid: self.task_uuid.clone(),
289 worker_id: self.worker_id.clone(),
290 controller_addr,
291 task_definition: self.task_definition.clone(),
292 };
293 let client_addr = C::start_in_arbiter_(arbiter, client_ctx);
294
295 send_center_task_started(
296 &self.task_uuid,
297 &self.task_definition,
298 self.task_definition.name(),
299 );
300
301 TaskExecutionContext {
302 task_uuid: self.uuid().to_string(),
303 parent_task_uuid,
304 stop_task_addr: client_addr.recipient::<StopTask>(),
305 controller_addr: controller_addr_clone,
306 }
307 }
308
309 fn uuid(&self) -> &str { &self.task_uuid }
310
311 fn parent_uuid(&self) -> &str { &self.task_definition.parent_task_uuid() }
312
313 fn worker_id(&self) -> &str { &self.worker_id }
314
315 fn update_worker_id(&mut self, worker_id: String) {
316 self.task_definition.update_worker_id(worker_id.clone());
317 self.worker_id = worker_id;
318 }
319
320 fn update_task_uuid(&mut self) {
321 self.task_uuid = Uuid::new_v4().to_string();
322 self.task_definition.update_task_uuid(self.task_uuid.clone());
323 }
324
325 fn clone_box(&self) -> Box<dyn TaskWrapper> { Box::new((*self).clone()) }
326
327 fn plugin(&self) -> WorkerPlugin { self.task_definition.plugin() }
328
329 fn name(&self) -> &str { self.task_definition.name() }
330}
331