1use std::{
2 collections::{BTreeMap, VecDeque},
3 env::Vars,
4 path::PathBuf,
5 process::{Command, Stdio},
6 sync::{Arc, RwLock},
7 time::SystemTime,
8};
9
10use chrono::{DateTime, Utc};
11use dadk_config::user::UserCleanLevel;
12use log::{debug, error, info, warn};
13
14use crate::{
15 context::{Action, DadkUserExecuteContext},
16 executor::cache::CacheDir,
17 parser::{
18 task::{CodeSource, PrebuiltSource, TaskType},
19 task_log::{BuildStatus, InstallStatus, TaskLog},
20 },
21 scheduler::{SchedEntities, SchedEntity},
22 utils::{file::FileUtils, path::abs_path},
23};
24
25use dadk_config::common::task::TaskEnv;
26
27use self::cache::{CacheDirType, TaskDataDir};
28
29pub mod cache;
30pub mod source;
31#[cfg(test)]
32mod tests;
33
34lazy_static! {
35 pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new());
37}
38
39#[derive(Debug, Clone)]
40pub struct Executor {
41 entity: Arc<SchedEntity>,
42 action: Action,
43 local_envs: EnvMap,
44 build_dir: CacheDir,
46 source_dir: Option<CacheDir>,
48 task_data_dir: TaskDataDir,
50 dragonos_sysroot: PathBuf,
52}
53
54impl Executor {
55 pub fn new(
68 entity: Arc<SchedEntity>,
69 action: Action,
70 dragonos_sysroot: PathBuf,
71 ) -> Result<Self, ExecutorError> {
72 let local_envs = EnvMap::new();
73 let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?;
74 let task_data_dir = TaskDataDir::new(entity.clone())?;
75
76 let source_dir = if CacheDir::need_source_cache(&entity) {
77 Some(CacheDir::new(entity.clone(), CacheDirType::Source)?)
78 } else {
79 None
80 };
81
82 let result: Executor = Self {
83 action,
84 entity,
85 local_envs,
86 build_dir,
87 source_dir,
88 task_data_dir,
89 dragonos_sysroot,
90 };
91
92 return Ok(result);
93 }
94
95 pub fn execute(&mut self) -> Result<(), ExecutorError> {
105 info!("Execute task: {}", self.entity.task().name_version());
106
107 let r = self.do_execute();
108 self.save_task_data(r.clone());
109 info!("Task {} finished", self.entity.task().name_version());
110 return r;
111 }
112
113 fn save_task_data(&self, r: Result<(), ExecutorError>) {
115 let mut task_log = self.task_data_dir.task_log();
116 match self.action {
117 Action::Build => {
118 if r.is_ok() {
119 task_log.set_build_status(BuildStatus::Success);
120 } else {
121 task_log.set_build_status(BuildStatus::Failed);
122 }
123
124 task_log.set_build_time_now();
125 }
126
127 Action::Install => {
128 if r.is_ok() {
129 task_log.set_install_status(InstallStatus::Success);
130 } else {
131 task_log.set_install_status(InstallStatus::Failed);
132 }
133 task_log.set_install_time_now();
134 }
135
136 Action::Clean(_) => {
137 task_log.clean_build_status();
138 task_log.clean_install_status();
139 }
140 }
141
142 self.task_data_dir
143 .save_task_log(&task_log)
144 .expect("Failed to save task log");
145 }
146
147 fn do_execute(&mut self) -> Result<(), ExecutorError> {
148 self.prepare_local_env()?;
150
151 match self.action {
152 Action::Build => {
153 self.pre_build()?;
155 self.build()?;
157 self.post_build()?;
159 }
160 Action::Install => {
161 self.install()?;
163 }
164 Action::Clean(_) => {
165 let r = self.clean();
167 if let Err(e) = r {
168 error!(
169 "Failed to clean task {}: {:?}",
170 self.entity.task().name_version(),
171 e
172 );
173 }
174 }
175 }
176
177 return Ok(());
178 }
179
180 fn pre_build(&mut self) -> Result<(), ExecutorError> {
181 if let Some(pre_build) = self.entity.task().build.pre_build {
182 let output = Command::new(pre_build)
183 .output()
184 .expect("Failed to execute pre_build script");
185
186 if output.status.success() {
188 info!("Pre-build script executed successfully");
189 } else {
190 error!("Pre-build script failed");
191 return Err(ExecutorError::TaskFailed(
192 "Pre-build script failed".to_string(),
193 ));
194 }
195 }
196 Ok(())
197 }
198
199 fn build(&mut self) -> Result<(), ExecutorError> {
200 if let Some(status) = self.task_log().build_status() {
201 if let Some(build_time) = self.task_log().build_time() {
202 let mut last_modified = last_modified_time(&self.entity.file_path(), build_time)?;
203 last_modified = core::cmp::max(
204 last_modified,
205 last_modified_time(&self.src_work_dir(), build_time)?,
206 );
207
208 if *status == BuildStatus::Success
209 && (self.entity.task().build_once || last_modified < *build_time)
210 {
211 info!(
212 "Task {} has been built successfully, skip build.",
213 self.entity.task().name_version()
214 );
215 return Ok(());
216 }
217 }
218 }
219
220 return self.do_build();
221 }
222
223 fn post_build(&mut self) -> Result<(), ExecutorError> {
224 if let Some(post_build) = self.entity.task().build.post_build {
225 let output = Command::new(post_build)
226 .output()
227 .expect("Failed to execute post_build script");
228
229 if output.status.success() {
231 info!("Post-build script executed successfully");
232 } else {
233 error!("Post-build script failed");
234 return Err(ExecutorError::TaskFailed(
235 "Post-build script failed".to_string(),
236 ));
237 }
238 }
239 Ok(())
240 }
241
242 fn do_build(&mut self) -> Result<(), ExecutorError> {
244 self.prepare_input()?;
246
247 let command: Option<Command> = self.create_command()?;
248 if let Some(cmd) = command {
249 self.run_command(cmd)?;
250 }
251
252 if self.build_dir.is_empty()? {
254 warn!(
255 "Task {}: build result is empty, do you forget to copy the result to [$DADK_CURRENT_BUILD_DIR]?",
256 self.entity.task().name_version(),
257 );
258 }
259 return Ok(());
260 }
261
262 fn install(&self) -> Result<(), ExecutorError> {
263 log::trace!("dadk-user: install {}", self.entity.task().name_version());
264 if let Some(status) = self.task_log().install_status() {
265 if let Some(install_time) = self.task_log().install_time() {
266 let last_modified = last_modified_time(&self.build_dir.path, install_time)?;
267 let last_modified = core::cmp::max(
268 last_modified,
269 last_modified_time(&self.entity.file_path(), install_time)?,
270 );
271
272 if *status == InstallStatus::Success
273 && (self.entity.task().install_once || last_modified < *install_time)
274 {
275 info!(
276 "install: Task {} not changed.",
277 self.entity.task().name_version()
278 );
279 return Ok(());
280 }
281 }
282 }
283 log::trace!(
284 "dadk-user: to do install {}",
285 self.entity.task().name_version()
286 );
287 return self.do_install();
288 }
289
290 fn do_install(&self) -> Result<(), ExecutorError> {
292 let binding = self.entity.task();
293 let in_dragonos_path = binding.install.in_dragonos_path.as_ref();
294 if in_dragonos_path.is_none() {
296 return Ok(());
297 }
298 info!("Installing task: {}", self.entity.task().name_version());
299 let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string();
300
301 debug!("in_dragonos_path: {}", in_dragonos_path);
302 {
304 let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count();
305 in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string();
306 }
307 let install_path = abs_path(&self.dragonos_sysroot.join(in_dragonos_path));
309 debug!("install_path: {:?}", install_path);
310 std::fs::create_dir_all(&install_path).map_err(|e| {
312 ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string()))
313 })?;
314
315 let build_dir: PathBuf = self.build_dir.path.clone();
317 FileUtils::copy_dir_all(&build_dir, &install_path)
318 .map_err(|e| ExecutorError::InstallError(e))?;
319 info!("Task {} installed.", self.entity.task().name_version());
320
321 return Ok(());
322 }
323
324 fn clean(&self) -> Result<(), ExecutorError> {
325 let level = if let Action::Clean(l) = self.action {
326 l
327 } else {
328 panic!(
329 "BUG: clean() called with non-clean action. executor details: {:?}",
330 self
331 );
332 };
333 info!(
334 "Cleaning task: {}, level={level:?}",
335 self.entity.task().name_version()
336 );
337
338 let r: Result<(), ExecutorError> = match level {
339 UserCleanLevel::All => self.clean_all(),
340 UserCleanLevel::InSrc => self.clean_src(),
341 UserCleanLevel::Output => {
342 self.clean_target()?;
343 self.clean_cache()
344 }
345 };
346
347 if let Err(e) = r {
348 error!(
349 "Failed to clean task: {}, error message: {:?}",
350 self.entity.task().name_version(),
351 e
352 );
353 return Err(e);
354 }
355
356 return Ok(());
357 }
358
359 fn clean_all(&self) -> Result<(), ExecutorError> {
360 self.clean_src()?;
362 self.clean_target()?;
364 self.clean_cache()?;
366 return Ok(());
367 }
368
369 fn clean_src(&self) -> Result<(), ExecutorError> {
371 let cmd: Option<Command> = self.create_command()?;
372 if cmd.is_none() {
373 return Ok(());
375 }
376 info!(
377 "{}: Cleaning in source directory: {:?}",
378 self.entity.task().name_version(),
379 self.src_work_dir()
380 );
381
382 let cmd = cmd.unwrap();
383 self.run_command(cmd)?;
384 return Ok(());
385 }
386
387 fn clean_target(&self) -> Result<(), ExecutorError> {
389 info!(
390 "{}: Cleaning build target directory: {:?}",
391 self.entity.task().name_version(),
392 self.build_dir.path
393 );
394
395 return self.build_dir.remove_self_recursive();
396 }
397
398 fn clean_cache(&self) -> Result<(), ExecutorError> {
400 let cache_dir = self.source_dir.as_ref();
401 if cache_dir.is_none() {
402 return Ok(());
404 }
405 info!(
406 "{}: Cleaning cache directory: {}",
407 self.entity.task().name_version(),
408 self.src_work_dir().display()
409 );
410 return cache_dir.unwrap().remove_self_recursive();
411 }
412
413 fn src_work_dir(&self) -> PathBuf {
415 if let Some(local_path) = self.entity.task().source_path() {
416 return local_path;
417 }
418 return self.source_dir.as_ref().unwrap().path.clone();
419 }
420
421 fn task_log(&self) -> TaskLog {
422 return self.task_data_dir.task_log();
423 }
424
425 fn create_command(&self) -> Result<Option<Command>, ExecutorError> {
427 let raw_cmd = match self.entity.task().task_type {
429 TaskType::BuildFromSource(_) => match self.action {
430 Action::Build => self.entity.task().build.build_command.clone(),
431 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
432 _ => unimplemented!(
433 "create_command: Action {:?} not supported yet.",
434 self.action
435 ),
436 },
437
438 TaskType::InstallFromPrebuilt(_) => match self.action {
439 Action::Build => self.entity.task().build.build_command.clone(),
440 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
441 _ => unimplemented!(
442 "create_command: Action {:?} not supported yet.",
443 self.action
444 ),
445 },
446 };
447
448 if raw_cmd.is_none() {
449 return Ok(None);
450 }
451
452 let raw_cmd = raw_cmd.unwrap();
453
454 let mut command = Command::new("bash");
455 command.current_dir(self.src_work_dir());
456
457 command.arg("-c");
459 command.arg(raw_cmd);
460
461 let env_list = ENV_LIST.read().unwrap();
463 for (key, value) in env_list.envs.iter() {
464 command.env(key, value.value.clone());
468 }
469 drop(env_list);
470 for (key, value) in self.local_envs.envs.iter() {
471 debug!("Local env found: {}={}", key, value.value);
472 command.env(key, value.value.clone());
473 }
474
475 return Ok(Some(command));
476 }
477
478 fn prepare_local_env(&mut self) -> Result<(), ExecutorError> {
480 let binding = self.entity.task();
481 let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref();
482
483 if let Some(task_envs) = task_envs {
484 for tv in task_envs.iter() {
485 self.local_envs
486 .add(EnvVar::new(tv.key().to_string(), tv.value().to_string()));
487 }
488 }
489
490 self.local_envs.add(EnvVar::new(
492 "DADK_CURRENT_BUILD_DIR".to_string(),
493 self.build_dir.path.to_str().unwrap().to_string(),
494 ));
495
496 return Ok(());
497 }
498
499 fn prepare_input(&self) -> Result<(), ExecutorError> {
500 let task = self.entity.task();
502 match &task.task_type {
503 TaskType::BuildFromSource(cs) => {
504 if self.source_dir.is_none() {
505 return Ok(());
506 }
507 let source_dir = self.source_dir.as_ref().unwrap();
508 match cs {
509 CodeSource::Git(git) => {
510 git.prepare(source_dir)
511 .map_err(|e| ExecutorError::PrepareEnvError(e))?;
512 }
513 CodeSource::Local(_) => return Ok(()),
515 CodeSource::Archive(archive) => {
517 archive
518 .download_unzip(source_dir)
519 .map_err(|e| ExecutorError::PrepareEnvError(e))?;
520 }
521 }
522 }
523 TaskType::InstallFromPrebuilt(pb) => {
524 match pb {
525 PrebuiltSource::Local(local_source) => {
527 let local_path = local_source.path();
528 let target_path = &self.build_dir.path;
529 FileUtils::copy_dir_all(&local_path, &target_path)
530 .map_err(|e| ExecutorError::TaskFailed(e))?; return Ok(());
532 }
533 PrebuiltSource::Archive(archive) => {
535 archive
536 .download_unzip(&self.build_dir)
537 .map_err(|e| ExecutorError::PrepareEnvError(e))?;
538 }
539 }
540 }
541 }
542
543 return Ok(());
544 }
545
546 fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> {
547 let mut child = command
548 .stdin(Stdio::inherit())
549 .spawn()
550 .map_err(|e| ExecutorError::IoError(e.to_string()))?;
551
552 let r = child
554 .wait()
555 .map_err(|e| ExecutorError::IoError(e.to_string()));
556 debug!("Command finished: {:?}", r);
557 if r.is_ok() {
558 let r = r.unwrap();
559 if r.success() {
560 return Ok(());
561 } else {
562 let errmsg = format!(
564 "Task {} failed, exit code = {}",
565 self.entity.task().name_version(),
566 r.code().unwrap()
567 );
568 error!("{errmsg}");
569 let command_opt = command.output();
570 if command_opt.is_err() {
571 return Err(ExecutorError::TaskFailed(
572 "Failed to get command output".to_string(),
573 ));
574 }
575 let command_opt = command_opt.unwrap();
576 let command_output = String::from_utf8_lossy(&command_opt.stderr);
577 let mut last_100_outputs = command_output
578 .lines()
579 .rev()
580 .take(100)
581 .collect::<Vec<&str>>();
582 last_100_outputs.reverse();
583 error!("Last 100 lines msg of stderr:");
584 for line in last_100_outputs {
585 error!("{}", line);
586 }
587 return Err(ExecutorError::TaskFailed(errmsg));
588 }
589 } else {
590 let errmsg = format!(
591 "Task {} failed, msg = {:?}",
592 self.entity.task().name_version(),
593 r.err().unwrap()
594 );
595 error!("{errmsg}");
596 return Err(ExecutorError::TaskFailed(errmsg));
597 }
598 }
599}
600
601#[derive(Debug, Clone)]
602pub struct EnvMap {
603 pub envs: BTreeMap<String, EnvVar>,
604}
605
606impl EnvMap {
607 pub fn new() -> Self {
608 Self {
609 envs: BTreeMap::new(),
610 }
611 }
612
613 pub fn add(&mut self, env: EnvVar) {
614 self.envs.insert(env.key.clone(), env);
615 }
616
617 #[allow(dead_code)]
618 pub fn get(&self, key: &str) -> Option<&EnvVar> {
619 self.envs.get(key)
620 }
621
622 pub fn add_vars(&mut self, vars: Vars) {
623 for (key, value) in vars {
624 self.add(EnvVar::new(key, value));
625 }
626 }
627}
628
629#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
631pub struct EnvVar {
632 pub key: String,
633 pub value: String,
634}
635
636impl EnvVar {
637 pub fn new(key: String, value: String) -> Self {
638 Self { key, value }
639 }
640}
641
642#[allow(dead_code)]
644#[derive(Debug, Clone)]
645pub enum ExecutorError {
646 PrepareEnvError(String),
648 IoError(String),
649 TaskFailed(String),
651 InstallError(String),
653 CleanError(String),
655}
656
657pub fn prepare_env(
659 sched_entities: &SchedEntities,
660 execute_ctx: &Arc<DadkUserExecuteContext>,
661) -> Result<(), ExecutorError> {
662 info!("Preparing environment variables...");
663 let env_list = create_global_env_list(sched_entities, execute_ctx)?;
664 let mut global_env_list = ENV_LIST.write().unwrap();
666 *global_env_list = env_list;
667 return Ok(());
668}
669
670fn create_global_env_list(
672 sched_entities: &SchedEntities,
673 execute_ctx: &Arc<DadkUserExecuteContext>,
674) -> Result<EnvMap, ExecutorError> {
675 let mut env_list = EnvMap::new();
676 let envs: Vars = std::env::vars();
677 env_list.add_vars(envs);
678
679 for entity in sched_entities.entities().iter() {
681 let build_dir = CacheDir::build_dir(entity.clone())?;
683
684 let build_dir_key = CacheDir::build_dir_env_key(&entity)?;
685 env_list.add(EnvVar::new(
686 build_dir_key,
687 build_dir.to_str().unwrap().to_string(),
688 ));
689
690 if CacheDir::need_source_cache(entity) {
692 let source_dir = CacheDir::source_dir(entity.clone())?;
693 let source_dir_key = CacheDir::source_dir_env_key(&entity)?;
694 env_list.add(EnvVar::new(
695 source_dir_key,
696 source_dir.to_str().unwrap().to_string(),
697 ));
698 }
699 }
700
701 let target_arch = execute_ctx.target_arch();
703 env_list.add(EnvVar::new("ARCH".to_string(), (*target_arch).into()));
704
705 return Ok(env_list);
706}
707
708fn last_modified_time(
715 path: &PathBuf,
716 build_time: &DateTime<Utc>,
717) -> Result<DateTime<Utc>, ExecutorError> {
718 let mut queue = VecDeque::new();
719 queue.push_back(path.clone());
720
721 let mut last_modified = DateTime::<Utc>::from(SystemTime::UNIX_EPOCH);
722
723 while let Some(current_path) = queue.pop_front() {
724 let metadata = current_path
725 .metadata()
726 .map_err(|e| ExecutorError::InstallError(e.to_string()))?;
727
728 if metadata.is_dir() {
729 for r in std::fs::read_dir(¤t_path).unwrap() {
730 if let Ok(entry) = r {
731 if entry.file_name() == "target" {
733 continue;
734 }
735
736 let entry_path = entry.path();
737 let entry_metadata = entry.metadata().unwrap();
738 let file_modified = DateTime::<Utc>::from(entry_metadata.modified().unwrap());
740 last_modified = std::cmp::max(last_modified, file_modified);
741
742 if last_modified > *build_time {
744 return Ok(last_modified);
745 }
746
747 if entry_metadata.is_dir() {
748 queue.push_back(entry_path);
750 }
751 }
752 }
753 } else {
754 let file_modified = DateTime::<Utc>::from(metadata.modified().unwrap());
756 last_modified = std::cmp::max(last_modified, file_modified);
757
758 if last_modified > *build_time {
760 return Ok(last_modified);
761 }
762 }
763 }
764
765 if last_modified == DateTime::<Utc>::from(SystemTime::UNIX_EPOCH) {
766 return Err(ExecutorError::InstallError(format!(
767 "Failed to get last modified time for path: {}",
768 path.display()
769 )));
770 }
771 Ok(last_modified)
772}