patoka/worker/
task.rs

1use actix::prelude::*;
2use serde;
3use serde_derive::{Deserialize, Serialize};
4use serde_json;
5use serde_json::json;
6use uuid::Uuid;
7
8use crate::{
9    center::send::*,
10    control::message::StopTask,
11    worker::{
12        client::*,
13        controller::{WorkerController},
14        plugin::{WorkerPlugin},
15        task_reader::TaskReader,
16        tracker,
17        worker_message::{WorkerMessage, Dest, WorkerMessagePayload},
18    },
19};
20
21#[derive(Clone)]
22pub enum ControllerAddr {
23    Controller(Addr<WorkerController>),
24    Reader(Addr<TaskReader>),
25    None,
26}
27
28#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
29#[serde(rename_all = "snake_case")]
30pub enum TaskStatus {
31    Unknown,
32    Running,
33    Suspended,
34    FinishedSuccess,
35    FinishedFailure,
36}
37
38pub struct TaskExecutionContext {
39    pub task_uuid: String,
40    pub parent_task_uuid: String,
41    pub stop_task_addr: Recipient<StopTask>,
42    pub controller_addr: ControllerAddr,
43}
44
45impl TaskExecutionContext {
46    pub fn send_worker_message(&self, msg: WorkerMessage) {
47        if let ControllerAddr::Controller(addr) = &self.controller_addr {
48            addr.do_send(msg);
49        }
50    }
51}
52
53pub trait TaskWrapper: Send + Sync {
54    fn execute_in_arbiter(
55        &self,
56        arbiter: &ArbiterHandle,
57        controller_addr: ControllerAddr,
58    ) -> TaskExecutionContext;
59
60    fn uuid(&self) -> &str;
61
62    fn parent_uuid(&self) -> &str;
63
64    fn worker_id(&self) -> &str;
65
66    fn update_worker_id(&mut self, worker_id: String);
67
68    /// Used when the task is restarted.
69    fn update_task_uuid(&mut self);
70
71    fn clone_box(&self) -> Box<dyn TaskWrapper>;
72
73    fn plugin(&self) -> WorkerPlugin;
74
75    fn name(&self) -> &str;
76}
77
78pub trait TaskDefinition {
79
80    fn update_task_uuid(&mut self, task_uuid: String);
81
82    fn update_worker_id(&mut self, task_uuid: String);
83
84    fn parent_task_uuid(&self) -> &str;
85
86    fn plugin(&self) -> WorkerPlugin;
87
88    fn name(&self) -> &str;
89}
90
91#[derive(Clone, Serialize, Deserialize)]
92pub struct GenTaskDefinition<P> {
93
94    pub executor_path: String,
95
96    /// Parameters to pass to the executor.
97    pub params: P,
98
99    pub task_uuid: String,
100
101    pub name: String,
102
103    /// Empty string for the master task.
104    pub parent_task_uuid: String,
105
106    /// Optional: if the task is linked with a particular worker.
107    pub worker_id: String,
108
109    /// Worker plugin that must be active to execute the task.
110    pub plugin: WorkerPlugin,
111}
112
113impl<P> TaskDefinition for GenTaskDefinition<P> {
114    fn update_task_uuid(&mut self, task_uuid: String) {
115        self.task_uuid = task_uuid;
116    }
117
118    fn update_worker_id(&mut self, worker_id: String) {
119        self.worker_id = worker_id;
120    }
121
122    fn parent_task_uuid(&self) -> &str { &self.parent_task_uuid }
123
124    fn plugin(&self) -> WorkerPlugin { self.plugin }
125
126    fn name(&self) -> &str { &self.name }
127}
128
129impl<P> GenTaskDefinition<P>
130where
131    P: serde::Serialize + Clone
132{
133    pub fn new(
134        plugin: WorkerPlugin,
135        executor_path: &str,
136        params: P,
137        name: &str,
138    ) -> Self {
139        Self {
140            executor_path: executor_path.to_string(),
141            params,
142            task_uuid: String::new(),
143            name: name.to_string(),
144            parent_task_uuid: String::new(),
145            worker_id: String::new(),
146            plugin,
147        }
148    }
149
150    pub fn subtask(
151        plugin: WorkerPlugin,
152        executor_path: &str,
153        params: P,
154        parent_task_uuid: String,
155        name: &str,
156    ) -> Self {
157        Self {
158            executor_path: executor_path.to_string(),
159            params,
160            task_uuid: String::new(),
161            name: name.to_string(),
162            parent_task_uuid,
163            worker_id: String::new(),
164            plugin,
165        }
166    }
167
168    pub fn new_none_plugin(params: P, name: &str) -> Self {
169        Self::new(WorkerPlugin::None, "", params, name)
170    }
171
172    pub fn subtask_none_plugin(
173        params: P,
174        parent_task_uuid: String,
175        name: &str,
176    ) -> Self {
177        Self::subtask(WorkerPlugin::None, "", params, parent_task_uuid, name)
178    }
179
180    pub fn make_message(&self) -> WorkerMessage {
181        let params = serde_json::to_value(self.params.clone()).unwrap();
182        let data = json!({
183            "task": {
184                "executor_path": self.executor_path,
185                "params": params,
186                "name": self.name,
187            }
188        });
189
190        let dest = Dest::Worker;
191
192        let payload = WorkerMessagePayload {
193            dest,
194            worker_id: self.worker_id.clone(),
195            task_uuid: self.task_uuid.clone(),
196            plugin: WorkerPlugin::as_str(self.plugin).to_string(),
197            data,
198        };
199
200        WorkerMessage::new(payload)
201    }
202
203    pub fn make_message_with_data(
204        &self,
205        data: serde_json::Value
206    ) -> WorkerMessage {
207        let dest = Dest::Worker;
208
209        let payload = WorkerMessagePayload {
210            dest,
211            worker_id: self.worker_id.clone(),
212            task_uuid: self.task_uuid.clone(),
213            plugin: WorkerPlugin::as_str(self.plugin).to_string(),
214            data,
215        };
216
217        WorkerMessage::new(payload)
218    }
219}
220
221#[derive(Clone, Default)]
222pub struct WorkerTask<C>
223where
224    C: WorkerClient,
225    C: Actor<Context=Context<C>>,
226{
227    pub task_uuid: String,
228    pub worker_id: String,
229    pub task_definition: C::TaskDefinition,
230}
231
232impl<C: WorkerClient + Send + Sync> WorkerTask<C>
233where
234    C::TaskDefinition: TaskDefinition,
235    C: Actor<Context=Context<C>>,
236{
237    pub fn new(mut task_definition: C::TaskDefinition) -> Self {
238        let task_uuid = Uuid::new_v4().to_string();
239        task_definition.update_task_uuid(task_uuid.clone());
240        Self {
241            task_uuid,
242            worker_id: String::new(),
243            task_definition,
244        }
245    }
246
247    pub fn new_with_uuid(
248        mut task_definition: C::TaskDefinition,
249        task_uuid: String
250    ) -> Self {
251        task_definition.update_task_uuid(task_uuid.clone());
252        Self {
253            task_uuid,
254            worker_id: String::new(),
255            task_definition,
256        }
257    }
258}
259
260impl<C> TaskWrapper for WorkerTask<C>
261where
262    C: WorkerClient,
263    C: Actor<Context=Context<C>>,
264    Self: Send + Sync,
265    C::TaskDefinition: Clone + TaskDefinition + Send + Sync +
266        serde::Serialize,
267{
268    fn execute_in_arbiter(
269        &self,
270        arbiter: &ArbiterHandle,
271        controller_addr: ControllerAddr,
272    ) -> TaskExecutionContext {
273        let controller_addr_clone = controller_addr.clone();
274
275        let parent_task_uuid =
276            self.task_definition.parent_task_uuid().to_string();
277
278        if !parent_task_uuid.is_empty() {
279            tracker::subscribe_no_addr(
280                self.task_uuid.clone(),
281                parent_task_uuid.clone(),
282                self.task_definition.name().into(),
283                false,
284            );
285        }
286
287        let client_ctx = ClientContext {
288            task_uuid: self.task_uuid.clone(),
289            worker_id: self.worker_id.clone(),
290            controller_addr,
291            task_definition: self.task_definition.clone(),
292        };
293        let client_addr = C::start_in_arbiter_(arbiter, client_ctx);
294
295        send_center_task_started(
296            &self.task_uuid,
297            &self.task_definition,
298            self.task_definition.name(),
299        );
300
301        TaskExecutionContext {
302            task_uuid: self.uuid().to_string(),
303            parent_task_uuid,
304            stop_task_addr: client_addr.recipient::<StopTask>(),
305            controller_addr: controller_addr_clone,
306        }
307    }
308
309    fn uuid(&self) -> &str { &self.task_uuid }
310
311    fn parent_uuid(&self) -> &str { &self.task_definition.parent_task_uuid() }
312
313    fn worker_id(&self) -> &str { &self.worker_id }
314
315    fn update_worker_id(&mut self, worker_id: String) {
316        self.task_definition.update_worker_id(worker_id.clone());
317        self.worker_id = worker_id;
318    }
319
320    fn update_task_uuid(&mut self) {
321        self.task_uuid = Uuid::new_v4().to_string();
322        self.task_definition.update_task_uuid(self.task_uuid.clone());
323    }
324
325    fn clone_box(&self) -> Box<dyn TaskWrapper> { Box::new((*self).clone()) }
326
327    fn plugin(&self) -> WorkerPlugin { self.task_definition.plugin() }
328
329    fn name(&self) -> &str { self.task_definition.name() }
330}
331