1use std::{
2 collections::{BTreeMap, VecDeque},
3 env::Vars,
4 path::PathBuf,
5 process::{Command, Stdio},
6 sync::{Arc, RwLock},
7 time::SystemTime,
8};
9
10use chrono::{DateTime, Utc};
11use dadk_config::user::UserCleanLevel;
12use log::{debug, error, info, warn};
13
14use crate::{
15 context::{Action, DadkUserExecuteContext},
16 executor::cache::CacheDir,
17 parser::{
18 task::{CodeSource, PrebuiltSource, TaskType},
19 task_log::{BuildStatus, InstallStatus, TaskLog},
20 },
21 scheduler::{SchedEntities, SchedEntity},
22 utils::{file::FileUtils, path::abs_path},
23};
24
25use dadk_config::common::task::TaskEnv;
26
27use self::cache::{CacheDirType, TaskDataDir};
28
29pub mod cache;
30pub mod source;
31#[cfg(test)]
32mod tests;
33
34lazy_static! {
35 pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new());
37}
38
39#[derive(Debug, Clone)]
40pub struct Executor {
41 entity: Arc<SchedEntity>,
42 action: Action,
43 local_envs: EnvMap,
44 build_dir: CacheDir,
46 source_dir: Option<CacheDir>,
48 task_data_dir: TaskDataDir,
50 dragonos_sysroot: PathBuf,
52}
53
54impl Executor {
55 pub fn new(
68 entity: Arc<SchedEntity>,
69 action: Action,
70 dragonos_sysroot: PathBuf,
71 ) -> Result<Self, ExecutorError> {
72 let local_envs = EnvMap::new();
73 let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?;
74 let task_data_dir = TaskDataDir::new(entity.clone())?;
75
76 let source_dir = if CacheDir::need_source_cache(&entity) {
77 Some(CacheDir::new(entity.clone(), CacheDirType::Source)?)
78 } else {
79 None
80 };
81
82 let result: Executor = Self {
83 action,
84 entity,
85 local_envs,
86 build_dir,
87 source_dir,
88 task_data_dir,
89 dragonos_sysroot,
90 };
91
92 return Ok(result);
93 }
94
95 pub fn execute(&mut self) -> Result<(), ExecutorError> {
105 info!("Execute task: {}", self.entity.task().name_version());
106
107 let r = self.do_execute();
108 self.save_task_data(r.clone());
109 info!("Task {} finished", self.entity.task().name_version());
110 return r;
111 }
112
113 fn save_task_data(&self, r: Result<(), ExecutorError>) {
115 let mut task_log = self.task_data_dir.task_log();
116 match self.action {
117 Action::Build => {
118 if r.is_ok() {
119 task_log.set_build_status(BuildStatus::Success);
120 } else {
121 task_log.set_build_status(BuildStatus::Failed);
122 }
123
124 task_log.set_build_time_now();
125 }
126
127 Action::Install => {
128 if r.is_ok() {
129 task_log.set_install_status(InstallStatus::Success);
130 } else {
131 task_log.set_install_status(InstallStatus::Failed);
132 }
133 task_log.set_install_time_now();
134 }
135
136 Action::Clean(_) => {
137 task_log.clean_build_status();
138 task_log.clean_install_status();
139 }
140 }
141
142 if let Some(ts) = self.entity.config_file_timestamp() {
144 task_log.set_dadk_config_timestamp(ts);
145 }
146
147 self.task_data_dir
148 .save_task_log(&task_log)
149 .expect("Failed to save task log");
150 }
151
152 fn do_execute(&mut self) -> Result<(), ExecutorError> {
153 self.prepare_local_env()?;
155
156 match self.action {
157 Action::Build => {
158 self.pre_build()?;
160 self.build()?;
162 self.post_build()?;
164 }
165 Action::Install => {
166 self.install()?;
168 }
169 Action::Clean(_) => {
170 let r = self.clean();
172 if let Err(e) = r {
173 error!(
174 "Failed to clean task {}: {:?}",
175 self.entity.task().name_version(),
176 e
177 );
178 }
179 }
180 }
181
182 return Ok(());
183 }
184
185 fn pre_build(&mut self) -> Result<(), ExecutorError> {
186 if let Some(pre_build) = self.entity.task().build.pre_build {
187 let output = Command::new(pre_build)
188 .output()
189 .expect("Failed to execute pre_build script");
190
191 if output.status.success() {
193 info!("Pre-build script executed successfully");
194 } else {
195 error!("Pre-build script failed");
196 return Err(ExecutorError::TaskFailed(
197 "Pre-build script failed".to_string(),
198 ));
199 }
200 }
201 Ok(())
202 }
203
204 fn needs_build(&self) -> Result<bool, ExecutorError> {
205 if self.config_file_updated() {
207 return Ok(true);
208 }
209 let task_log = self.task_log();
210 let build_status = task_log.build_status();
211 if build_status.is_none() {
212 return Ok(true);
213 }
214
215 let build_time = task_log.build_time();
216 if build_time.is_none() {
217 return Ok(true);
218 }
219
220 let build_status = build_status.unwrap();
221 let build_time = build_time.unwrap();
222
223 let mut last_modified = last_modified_time(&self.entity.file_path(), build_time)?;
224 if let Some(ref d) = self.src_work_dir() {
225 last_modified = core::cmp::max(last_modified, last_modified_time(d, build_time)?);
226 }
227
228 if *build_status == BuildStatus::Success
230 && (self.entity.task().build_once || last_modified < *build_time)
231 {
232 return Ok(false);
233 }
234
235 return Ok(true);
236 }
237
238 fn config_file_updated(&self) -> bool {
239 let task_log = self.task_log();
240 task_log.dadk_config_timestamp() != self.entity.config_file_timestamp().as_ref()
241 }
242
243 fn build(&mut self) -> Result<(), ExecutorError> {
244 if !self.needs_build().unwrap_or(true) {
245 log::info!(
246 "No need to build: {}, skipping...",
247 self.entity.task().name_version()
248 );
249 return Ok(());
250 }
251
252 return self.do_build();
253 }
254
255 fn post_build(&mut self) -> Result<(), ExecutorError> {
256 if let Some(post_build) = self.entity.task().build.post_build {
257 let output = Command::new(post_build)
258 .output()
259 .expect("Failed to execute post_build script");
260
261 if output.status.success() {
263 info!("Post-build script executed successfully");
264 } else {
265 error!("Post-build script failed");
266 return Err(ExecutorError::TaskFailed(
267 "Post-build script failed".to_string(),
268 ));
269 }
270 }
271 Ok(())
272 }
273
274 fn do_build(&mut self) -> Result<(), ExecutorError> {
276 self.prepare_input()?;
278
279 let command: Option<Command> = self.create_command()?;
280 if let Some(cmd) = command {
281 self.run_command(cmd)?;
282 }
283
284 if self.build_dir.is_empty()? {
286 warn!(
287 "Task {}: build result is empty, do you forget to copy the result to [$DADK_CURRENT_BUILD_DIR]?",
288 self.entity.task().name_version(),
289 );
290 }
291 return Ok(());
292 }
293
294 fn needs_install(&self) -> Result<bool, ExecutorError> {
295 if self.config_file_updated() {
297 return Ok(true);
298 }
299 let task_log = self.task_log();
300 let install_status = task_log.install_status();
301 if install_status.is_none() {
302 return Ok(true);
303 }
304
305 let install_time = task_log.install_time();
306 if install_time.is_none() {
307 return Ok(true);
308 }
309
310 let install_status = install_status.unwrap();
311 let install_time = install_time.unwrap();
312
313 let last_modified = last_modified_time(&self.build_dir.path, install_time)?;
314 let last_modified = core::cmp::max(
315 last_modified,
316 last_modified_time(&self.entity.file_path(), install_time)?,
317 );
318
319 if *install_status == InstallStatus::Success
321 && (self.entity.task().install_once || last_modified < *install_time)
322 {
323 return Ok(false);
324 }
325
326 return Ok(true);
327 }
328
329 fn install(&self) -> Result<(), ExecutorError> {
330 log::trace!("dadk-user: install {}", self.entity.task().name_version());
331 if !self.needs_install().unwrap_or(false) {
332 info!(
333 "install: Task {} not changed.",
334 self.entity.task().name_version()
335 );
336 return Ok(());
337 }
338 log::trace!(
339 "dadk-user: to do install {}",
340 self.entity.task().name_version()
341 );
342 return self.do_install();
343 }
344
345 fn do_install(&self) -> Result<(), ExecutorError> {
347 let binding = self.entity.task();
348 let in_dragonos_path = binding.install.in_dragonos_path.as_ref();
349 if in_dragonos_path.is_none() {
351 return Ok(());
352 }
353 info!("Installing task: {}", self.entity.task().name_version());
354 let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string();
355
356 debug!("in_dragonos_path: {}", in_dragonos_path);
357 {
359 let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count();
360 in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string();
361 }
362 let install_path = abs_path(&self.dragonos_sysroot.join(in_dragonos_path));
364 debug!("install_path: {:?}", install_path);
365 std::fs::create_dir_all(&install_path).map_err(|e| {
367 ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string()))
368 })?;
369
370 let build_dir: PathBuf = self.build_dir.path.clone();
372 FileUtils::copy_dir_all(&build_dir, &install_path)
373 .map_err(|e| ExecutorError::InstallError(e))?;
374 info!("Task {} installed.", self.entity.task().name_version());
375
376 return Ok(());
377 }
378
379 fn clean(&self) -> Result<(), ExecutorError> {
380 let level = if let Action::Clean(l) = self.action {
381 l
382 } else {
383 panic!(
384 "BUG: clean() called with non-clean action. executor details: {:?}",
385 self
386 );
387 };
388 info!(
389 "Cleaning task: {}, level={level:?}",
390 self.entity.task().name_version()
391 );
392
393 let r: Result<(), ExecutorError> = match level {
394 UserCleanLevel::All => self.clean_all(),
395 UserCleanLevel::InSrc => self.clean_src(),
396 UserCleanLevel::Output => {
397 self.clean_target()?;
398 self.clean_cache()
399 }
400 };
401
402 if let Err(e) = r {
403 error!(
404 "Failed to clean task: {}, error message: {:?}",
405 self.entity.task().name_version(),
406 e
407 );
408 return Err(e);
409 }
410
411 return Ok(());
412 }
413
414 fn clean_all(&self) -> Result<(), ExecutorError> {
415 self.clean_src()?;
417 self.clean_target()?;
419 self.clean_cache()?;
421 return Ok(());
422 }
423
424 fn clean_src(&self) -> Result<(), ExecutorError> {
426 let cmd: Option<Command> = self.create_command()?;
427 if cmd.is_none() {
428 return Ok(());
430 }
431 info!(
432 "{}: Cleaning in source directory: {:?}",
433 self.entity.task().name_version(),
434 self.src_work_dir()
435 );
436
437 let cmd = cmd.unwrap();
438 self.run_command(cmd)?;
439 return Ok(());
440 }
441
442 fn clean_target(&self) -> Result<(), ExecutorError> {
444 info!(
445 "{}: Cleaning build target directory: {:?}",
446 self.entity.task().name_version(),
447 self.build_dir.path
448 );
449
450 return self.build_dir.remove_self_recursive();
451 }
452
453 fn clean_cache(&self) -> Result<(), ExecutorError> {
455 let cache_dir = self.source_dir.as_ref();
456 if cache_dir.is_none() {
457 return Ok(());
459 }
460 info!(
461 "{}: Cleaning cache directory: {}",
462 self.entity.task().name_version(),
463 cache_dir.as_ref().unwrap().path.display()
464 );
465 return cache_dir.unwrap().remove_self_recursive();
466 }
467
468 fn src_work_dir(&self) -> Option<PathBuf> {
470 if let Some(local_path) = self.entity.task().source_path() {
471 return Some(local_path);
472 }
473 return Some(self.source_dir.as_ref()?.path.clone());
474 }
475
476 fn task_log(&self) -> TaskLog {
477 return self.task_data_dir.task_log();
478 }
479
480 fn create_command(&self) -> Result<Option<Command>, ExecutorError> {
482 let raw_cmd = match self.entity.task().task_type {
484 TaskType::BuildFromSource(_) => match self.action {
485 Action::Build => self.entity.task().build.build_command.clone(),
486 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
487 _ => unimplemented!(
488 "create_command: Action {:?} not supported yet.",
489 self.action
490 ),
491 },
492
493 TaskType::InstallFromPrebuilt(_) => match self.action {
494 Action::Build => self.entity.task().build.build_command.clone(),
495 Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
496 _ => unimplemented!(
497 "create_command: Action {:?} not supported yet.",
498 self.action
499 ),
500 },
501 };
502
503 if raw_cmd.is_none() {
504 return Ok(None);
505 }
506
507 let raw_cmd = raw_cmd.unwrap();
508
509 let mut command = Command::new("bash");
510 command.current_dir(self.src_work_dir().unwrap());
511
512 command.arg("-c");
514 command.arg(raw_cmd);
515
516 let env_list = ENV_LIST.read().unwrap();
518 for (key, value) in env_list.envs.iter() {
519 command.env(key, value.value.clone());
523 }
524 drop(env_list);
525 for (key, value) in self.local_envs.envs.iter() {
526 debug!("Local env found: {}={}", key, value.value);
527 command.env(key, value.value.clone());
528 }
529
530 return Ok(Some(command));
531 }
532
533 fn prepare_local_env(&mut self) -> Result<(), ExecutorError> {
535 let binding = self.entity.task();
536 let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref();
537
538 if let Some(task_envs) = task_envs {
539 for tv in task_envs.iter() {
540 self.local_envs
541 .add(EnvVar::new(tv.key().to_string(), tv.value().to_string()));
542 }
543 }
544
545 self.local_envs.add(EnvVar::new(
547 "DADK_CURRENT_BUILD_DIR".to_string(),
548 self.build_dir.path.to_str().unwrap().to_string(),
549 ));
550
551 return Ok(());
552 }
553
554 fn prepare_input(&self) -> Result<(), ExecutorError> {
555 let task = self.entity.task();
557 match &task.task_type {
558 TaskType::BuildFromSource(cs) => {
559 if self.source_dir.is_none() {
560 return Ok(());
561 }
562 let source_dir = self.source_dir.as_ref().unwrap();
563 match cs {
564 CodeSource::Git(git) => {
565 git.prepare(source_dir)
566 .map_err(|e| ExecutorError::PrepareEnvError(e))?;
567 }
568 CodeSource::Local(_) => return Ok(()),
570 CodeSource::Archive(archive) => {
572 archive
573 .download_unzip(source_dir)
574 .map_err(|e| ExecutorError::PrepareEnvError(e))?;
575 }
576 }
577 }
578 TaskType::InstallFromPrebuilt(pb) => {
579 match pb {
580 PrebuiltSource::Local(local_source) => {
582 let local_path = local_source.path();
583 let target_path = &self.build_dir.path;
584 FileUtils::copy_dir_all(&local_path, &target_path)
585 .map_err(|e| ExecutorError::TaskFailed(e))?; return Ok(());
587 }
588 PrebuiltSource::Archive(archive) => {
590 archive
591 .download_unzip(&self.build_dir)
592 .map_err(|e| ExecutorError::PrepareEnvError(e))?;
593 }
594 }
595 }
596 }
597
598 return Ok(());
599 }
600
601 fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> {
602 let mut child = command
603 .stdin(Stdio::inherit())
604 .spawn()
605 .map_err(|e| ExecutorError::IoError(e.to_string()))?;
606
607 let r = child
609 .wait()
610 .map_err(|e| ExecutorError::IoError(e.to_string()));
611 debug!("Command finished: {:?}", r);
612 if r.is_ok() {
613 let r = r.unwrap();
614 if r.success() {
615 return Ok(());
616 } else {
617 let errmsg = format!(
619 "Task {} failed, exit code = {}",
620 self.entity.task().name_version(),
621 r.code().unwrap()
622 );
623 error!("{errmsg}");
624 let command_opt = command.output();
625 if command_opt.is_err() {
626 return Err(ExecutorError::TaskFailed(
627 "Failed to get command output".to_string(),
628 ));
629 }
630 let command_opt = command_opt.unwrap();
631 let command_output = String::from_utf8_lossy(&command_opt.stderr);
632 let mut last_100_outputs = command_output
633 .lines()
634 .rev()
635 .take(100)
636 .collect::<Vec<&str>>();
637 last_100_outputs.reverse();
638 error!("Last 100 lines msg of stderr:");
639 for line in last_100_outputs {
640 error!("{}", line);
641 }
642 return Err(ExecutorError::TaskFailed(errmsg));
643 }
644 } else {
645 let errmsg = format!(
646 "Task {} failed, msg = {:?}",
647 self.entity.task().name_version(),
648 r.err().unwrap()
649 );
650 error!("{errmsg}");
651 return Err(ExecutorError::TaskFailed(errmsg));
652 }
653 }
654}
655
656#[derive(Debug, Clone)]
657pub struct EnvMap {
658 pub envs: BTreeMap<String, EnvVar>,
659}
660
661impl EnvMap {
662 pub fn new() -> Self {
663 Self {
664 envs: BTreeMap::new(),
665 }
666 }
667
668 pub fn add(&mut self, env: EnvVar) {
669 self.envs.insert(env.key.clone(), env);
670 }
671
672 #[allow(dead_code)]
673 pub fn get(&self, key: &str) -> Option<&EnvVar> {
674 self.envs.get(key)
675 }
676
677 pub fn add_vars(&mut self, vars: Vars) {
678 for (key, value) in vars {
679 self.add(EnvVar::new(key, value));
680 }
681 }
682}
683
684#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
686pub struct EnvVar {
687 pub key: String,
688 pub value: String,
689}
690
691impl EnvVar {
692 pub fn new(key: String, value: String) -> Self {
693 Self { key, value }
694 }
695}
696
697#[allow(dead_code)]
699#[derive(Debug, Clone)]
700pub enum ExecutorError {
701 PrepareEnvError(String),
703 IoError(String),
704 TaskFailed(String),
706 InstallError(String),
708 CleanError(String),
710}
711
712pub fn prepare_env(
714 sched_entities: &SchedEntities,
715 execute_ctx: &Arc<DadkUserExecuteContext>,
716) -> Result<(), ExecutorError> {
717 info!("Preparing environment variables...");
718 let env_list = create_global_env_list(sched_entities, execute_ctx)?;
719 let mut global_env_list = ENV_LIST.write().unwrap();
721 *global_env_list = env_list;
722 return Ok(());
723}
724
725fn create_global_env_list(
727 sched_entities: &SchedEntities,
728 execute_ctx: &Arc<DadkUserExecuteContext>,
729) -> Result<EnvMap, ExecutorError> {
730 let mut env_list = EnvMap::new();
731 let envs: Vars = std::env::vars();
732 env_list.add_vars(envs);
733
734 for entity in sched_entities.entities().iter() {
736 let build_dir = CacheDir::build_dir(entity.clone())?;
738
739 let build_dir_key = CacheDir::build_dir_env_key(&entity)?;
740 env_list.add(EnvVar::new(
741 build_dir_key,
742 build_dir.to_str().unwrap().to_string(),
743 ));
744
745 if CacheDir::need_source_cache(entity) {
747 let source_dir = CacheDir::source_dir(entity.clone())?;
748 let source_dir_key = CacheDir::source_dir_env_key(&entity)?;
749 env_list.add(EnvVar::new(
750 source_dir_key,
751 source_dir.to_str().unwrap().to_string(),
752 ));
753 }
754 }
755
756 let target_arch = execute_ctx.target_arch();
758 env_list.add(EnvVar::new("ARCH".to_string(), (*target_arch).into()));
759
760 return Ok(env_list);
761}
762
763fn last_modified_time(
770 path: &PathBuf,
771 build_time: &DateTime<Utc>,
772) -> Result<DateTime<Utc>, ExecutorError> {
773 let mut queue = VecDeque::new();
774 queue.push_back(path.clone());
775
776 let mut last_modified = DateTime::<Utc>::from(SystemTime::UNIX_EPOCH);
777
778 while let Some(current_path) = queue.pop_front() {
779 let metadata = current_path
780 .metadata()
781 .map_err(|e| ExecutorError::InstallError(e.to_string()))?;
782
783 if metadata.is_dir() {
784 for r in std::fs::read_dir(¤t_path).unwrap() {
785 if let Ok(entry) = r {
786 if entry.file_name() == "target" {
788 continue;
789 }
790
791 let entry_path = entry.path();
792 let entry_metadata = entry.metadata().unwrap();
793 let file_modified = DateTime::<Utc>::from(entry_metadata.modified().unwrap());
795 last_modified = std::cmp::max(last_modified, file_modified);
796
797 if last_modified > *build_time {
799 return Ok(last_modified);
800 }
801
802 if entry_metadata.is_dir() {
803 queue.push_back(entry_path);
805 }
806 }
807 }
808 } else {
809 let file_modified = DateTime::<Utc>::from(metadata.modified().unwrap());
811 last_modified = std::cmp::max(last_modified, file_modified);
812
813 if last_modified > *build_time {
815 return Ok(last_modified);
816 }
817 }
818 }
819
820 if last_modified == DateTime::<Utc>::from(SystemTime::UNIX_EPOCH) {
821 return Err(ExecutorError::InstallError(format!(
822 "Failed to get last modified time for path: {}",
823 path.display()
824 )));
825 }
826 Ok(last_modified)
827}