assemble_core/task/
task_executor.rs

1use crate::identifier::TaskId;
2use crate::project::Project;
3
4use crate::task::task_executor::hidden::TaskWork;
5use crate::task::ExecutableTask;
6
7use crate::work_queue::{TypedWorkerQueue, WorkerExecutor};
8use crate::BuildResult;
9use std::any::Any;
10use std::sync::Arc;
11
12use crate::project::finder::{ProjectFinder, ProjectPath, ProjectPathBuf};
13use crate::project::shared::SharedProject;
14use parking_lot::RwLock;
15use std::io;
16
17/// The task executor. Implemented on top of a thread pool to maximize parallelism.
18pub struct TaskExecutor<'exec> {
19    task_queue: TypedWorkerQueue<'exec, TaskWork>,
20    project: SharedProject,
21    task_returns: Arc<RwLock<Vec<(TaskId, BuildResult<(bool, bool)>)>>>,
22}
23
24impl<'exec> TaskExecutor<'exec> {
25    /// Create a new task executor
26    pub fn new(project: SharedProject, executor: &'exec WorkerExecutor) -> Self {
27        let typed_queue = executor.queue().typed();
28        Self {
29            task_queue: typed_queue,
30            project,
31            task_returns: Default::default(),
32        }
33    }
34
35    /// Queue a task to be executed
36    pub fn queue_task<E: ExecutableTask + 'static>(&mut self, task: E) -> io::Result<()> {
37        let project = task
38            .task_id()
39            .project_id()
40            .expect("project id should always exist at this point");
41        trace!(
42            "finding project {} to execute {} with",
43            project,
44            task.task_id()
45        );
46
47        let finder = ProjectFinder::new(&self.project);
48        let project = finder
49            .find(&ProjectPathBuf::from(project))
50            .expect("should exist");
51
52        let token = TaskWork::new(Box::new(task), &project, &self.task_returns);
53        let _ = self.task_queue.submit(token)?;
54        Ok(())
55    }
56
57    /// Gets finished tasks along with their build result. Does not repeat outputs, so the returned
58    /// vector must be used
59    #[must_use]
60    pub fn finished_tasks(&mut self) -> Vec<(TaskId, BuildResult<(bool, bool)>)> {
61        let mut guard = self.task_returns.write();
62        guard.drain(..).collect()
63    }
64
65    /// Wait for all running and queued tasks to finish.
66    pub fn finish(
67        self,
68    ) -> (
69        Vec<(TaskId, BuildResult<(bool, bool)>)>,
70        Option<Box<dyn Any + Send + 'static>>,
71    ) {
72        let error = self.task_queue.join().err();
73        match Arc::try_unwrap(self.task_returns) {
74            Ok(returns) => {
75                let returns = returns.write().drain(..).collect::<Vec<_>>();
76                (returns, error)
77            }
78            _ => {
79                unreachable!("Since all references should be weak, this shouldn't be possible")
80            }
81        }
82    }
83}
84
85/// Hides implementation details for TaskWork
86mod hidden {
87    use super::*;
88    use crate::logging::LOGGING_CONTROL;
89
90    use crate::project::shared::WeakSharedProject;
91    use crate::work_queue::ToWorkToken;
92    use std::sync::Weak;
93    use std::thread;
94
95    pub struct TaskWork {
96        exec: Box<dyn ExecutableTask>,
97        project: WeakSharedProject,
98        return_vec: Arc<RwLock<Vec<(TaskId, BuildResult<(bool, bool)>)>>>,
99    }
100
101    impl TaskWork {
102        pub fn new(
103            exec: Box<dyn ExecutableTask>,
104            project: &SharedProject,
105            return_vec: &Arc<RwLock<Vec<(TaskId, BuildResult<(bool, bool)>)>>>,
106        ) -> Self {
107            Self {
108                exec,
109                project: project.weak(),
110                return_vec: return_vec.clone(),
111            }
112        }
113    }
114
115    impl ToWorkToken for TaskWork {
116        fn on_start(&self) -> Box<dyn Fn() + Send + Sync> {
117            let id = self.exec.task_id();
118            Box::new(move || {
119                LOGGING_CONTROL.start_task(&id);
120                LOGGING_CONTROL.in_task(id.clone());
121                trace!("{} starting task {}", thread::current().name().unwrap(), id);
122            })
123        }
124
125        fn on_complete(&self) -> Box<dyn Fn() + Send + Sync> {
126            let id = self.exec.task_id();
127            Box::new(move || {
128                trace!("{} finished task {}", thread::current().name().unwrap(), id);
129                LOGGING_CONTROL.end_task(&id);
130                LOGGING_CONTROL.reset();
131            })
132        }
133
134        fn work(mut self) {
135            let upgraded_project = self
136                .project
137                .upgrade()
138                .expect("Project dropped but task attempting to be ran");
139            upgraded_project.with(|project| {
140                let output = { self.exec.execute(&*project) };
141                let up_to_date = self.exec.task_up_to_date();
142                let did_work = self.exec.did_work();
143                let mut write_guard = self.return_vec.write();
144
145                let status = (self.exec.task_id(), output.map(|_| (up_to_date, did_work)));
146                write_guard.push(status);
147            })
148        }
149    }
150}