assemble_core/task/
task_executor.rs1use crate::identifier::TaskId;
2use crate::project::Project;
3
4use crate::task::task_executor::hidden::TaskWork;
5use crate::task::ExecutableTask;
6
7use crate::work_queue::{TypedWorkerQueue, WorkerExecutor};
8use crate::BuildResult;
9use std::any::Any;
10use std::sync::Arc;
11
12use crate::project::finder::{ProjectFinder, ProjectPath, ProjectPathBuf};
13use crate::project::shared::SharedProject;
14use parking_lot::RwLock;
15use std::io;
16
17pub struct TaskExecutor<'exec> {
19 task_queue: TypedWorkerQueue<'exec, TaskWork>,
20 project: SharedProject,
21 task_returns: Arc<RwLock<Vec<(TaskId, BuildResult<(bool, bool)>)>>>,
22}
23
24impl<'exec> TaskExecutor<'exec> {
25 pub fn new(project: SharedProject, executor: &'exec WorkerExecutor) -> Self {
27 let typed_queue = executor.queue().typed();
28 Self {
29 task_queue: typed_queue,
30 project,
31 task_returns: Default::default(),
32 }
33 }
34
35 pub fn queue_task<E: ExecutableTask + 'static>(&mut self, task: E) -> io::Result<()> {
37 let project = task
38 .task_id()
39 .project_id()
40 .expect("project id should always exist at this point");
41 trace!(
42 "finding project {} to execute {} with",
43 project,
44 task.task_id()
45 );
46
47 let finder = ProjectFinder::new(&self.project);
48 let project = finder
49 .find(&ProjectPathBuf::from(project))
50 .expect("should exist");
51
52 let token = TaskWork::new(Box::new(task), &project, &self.task_returns);
53 let _ = self.task_queue.submit(token)?;
54 Ok(())
55 }
56
57 #[must_use]
60 pub fn finished_tasks(&mut self) -> Vec<(TaskId, BuildResult<(bool, bool)>)> {
61 let mut guard = self.task_returns.write();
62 guard.drain(..).collect()
63 }
64
65 pub fn finish(
67 self,
68 ) -> (
69 Vec<(TaskId, BuildResult<(bool, bool)>)>,
70 Option<Box<dyn Any + Send + 'static>>,
71 ) {
72 let error = self.task_queue.join().err();
73 match Arc::try_unwrap(self.task_returns) {
74 Ok(returns) => {
75 let returns = returns.write().drain(..).collect::<Vec<_>>();
76 (returns, error)
77 }
78 _ => {
79 unreachable!("Since all references should be weak, this shouldn't be possible")
80 }
81 }
82 }
83}
84
85mod hidden {
87 use super::*;
88 use crate::logging::LOGGING_CONTROL;
89
90 use crate::project::shared::WeakSharedProject;
91 use crate::work_queue::ToWorkToken;
92 use std::sync::Weak;
93 use std::thread;
94
95 pub struct TaskWork {
96 exec: Box<dyn ExecutableTask>,
97 project: WeakSharedProject,
98 return_vec: Arc<RwLock<Vec<(TaskId, BuildResult<(bool, bool)>)>>>,
99 }
100
101 impl TaskWork {
102 pub fn new(
103 exec: Box<dyn ExecutableTask>,
104 project: &SharedProject,
105 return_vec: &Arc<RwLock<Vec<(TaskId, BuildResult<(bool, bool)>)>>>,
106 ) -> Self {
107 Self {
108 exec,
109 project: project.weak(),
110 return_vec: return_vec.clone(),
111 }
112 }
113 }
114
115 impl ToWorkToken for TaskWork {
116 fn on_start(&self) -> Box<dyn Fn() + Send + Sync> {
117 let id = self.exec.task_id();
118 Box::new(move || {
119 LOGGING_CONTROL.start_task(&id);
120 LOGGING_CONTROL.in_task(id.clone());
121 trace!("{} starting task {}", thread::current().name().unwrap(), id);
122 })
123 }
124
125 fn on_complete(&self) -> Box<dyn Fn() + Send + Sync> {
126 let id = self.exec.task_id();
127 Box::new(move || {
128 trace!("{} finished task {}", thread::current().name().unwrap(), id);
129 LOGGING_CONTROL.end_task(&id);
130 LOGGING_CONTROL.reset();
131 })
132 }
133
134 fn work(mut self) {
135 let upgraded_project = self
136 .project
137 .upgrade()
138 .expect("Project dropped but task attempting to be ran");
139 upgraded_project.with(|project| {
140 let output = { self.exec.execute(&*project) };
141 let up_to_date = self.exec.task_up_to_date();
142 let did_work = self.exec.did_work();
143 let mut write_guard = self.return_vec.write();
144
145 let status = (self.exec.task_id(), output.map(|_| (up_to_date, did_work)));
146 write_guard.push(status);
147 })
148 }
149 }
150}