dadk_user/executor/
mod.rs

1use std::{
2    collections::{BTreeMap, VecDeque},
3    env::Vars,
4    path::PathBuf,
5    process::{Command, Stdio},
6    sync::{Arc, RwLock},
7    time::SystemTime,
8};
9
10use chrono::{DateTime, Utc};
11use dadk_config::user::UserCleanLevel;
12use log::{debug, error, info, warn};
13
14use crate::{
15    context::{Action, DadkUserExecuteContext},
16    executor::cache::CacheDir,
17    parser::{
18        task::{CodeSource, PrebuiltSource, TaskType},
19        task_log::{BuildStatus, InstallStatus, TaskLog},
20    },
21    scheduler::{SchedEntities, SchedEntity},
22    utils::{file::FileUtils, path::abs_path},
23};
24
25use dadk_config::common::task::TaskEnv;
26
27use self::cache::{CacheDirType, TaskDataDir};
28
29pub mod cache;
30pub mod source;
31#[cfg(test)]
32mod tests;
33
34lazy_static! {
35    // 全局环境变量的列表
36    pub static ref ENV_LIST: RwLock<EnvMap> = RwLock::new(EnvMap::new());
37}
38
39#[derive(Debug, Clone)]
40pub struct Executor {
41    entity: Arc<SchedEntity>,
42    action: Action,
43    local_envs: EnvMap,
44    /// 任务构建结果输出到的目录
45    build_dir: CacheDir,
46    /// 如果任务需要源文件缓存,则此字段为 Some(CacheDir),否则为 None(使用本地源文件路径)
47    source_dir: Option<CacheDir>,
48    /// 任务数据目录
49    task_data_dir: TaskDataDir,
50    /// DragonOS sysroot的路径
51    dragonos_sysroot: PathBuf,
52}
53
54impl Executor {
55    /// # 创建执行器
56    ///
57    /// 用于执行一个任务
58    ///
59    /// ## 参数
60    ///
61    /// * `entity` - 任务调度实体
62    ///
63    /// ## 返回值
64    ///
65    /// * `Ok(Executor)` - 创建成功
66    /// * `Err(ExecutorError)` - 创建失败
67    pub fn new(
68        entity: Arc<SchedEntity>,
69        action: Action,
70        dragonos_sysroot: PathBuf,
71    ) -> Result<Self, ExecutorError> {
72        let local_envs = EnvMap::new();
73        let build_dir = CacheDir::new(entity.clone(), CacheDirType::Build)?;
74        let task_data_dir = TaskDataDir::new(entity.clone())?;
75
76        let source_dir = if CacheDir::need_source_cache(&entity) {
77            Some(CacheDir::new(entity.clone(), CacheDirType::Source)?)
78        } else {
79            None
80        };
81
82        let result: Executor = Self {
83            action,
84            entity,
85            local_envs,
86            build_dir,
87            source_dir,
88            task_data_dir,
89            dragonos_sysroot,
90        };
91
92        return Ok(result);
93    }
94
95    /// # 执行任务
96    ///
97    /// 创建执行器后,调用此方法执行任务。
98    /// 该方法会执行以下步骤:
99    ///
100    /// 1. 创建工作线程
101    /// 2. 准备环境变量
102    /// 3. 拉取数据(可选)
103    /// 4. 执行构建
104    pub fn execute(&mut self) -> Result<(), ExecutorError> {
105        info!("Execute task: {}", self.entity.task().name_version());
106
107        let r = self.do_execute();
108        self.save_task_data(r.clone());
109        info!("Task {} finished", self.entity.task().name_version());
110        return r;
111    }
112
113    /// # 保存任务数据
114    fn save_task_data(&self, r: Result<(), ExecutorError>) {
115        let mut task_log = self.task_data_dir.task_log();
116        match self.action {
117            Action::Build => {
118                if r.is_ok() {
119                    task_log.set_build_status(BuildStatus::Success);
120                } else {
121                    task_log.set_build_status(BuildStatus::Failed);
122                }
123
124                task_log.set_build_time_now();
125            }
126
127            Action::Install => {
128                if r.is_ok() {
129                    task_log.set_install_status(InstallStatus::Success);
130                } else {
131                    task_log.set_install_status(InstallStatus::Failed);
132                }
133                task_log.set_install_time_now();
134            }
135
136            Action::Clean(_) => {
137                task_log.clean_build_status();
138                task_log.clean_install_status();
139            }
140        }
141
142        self.task_data_dir
143            .save_task_log(&task_log)
144            .expect("Failed to save task log");
145    }
146
147    fn do_execute(&mut self) -> Result<(), ExecutorError> {
148        // 准备本地环境变量
149        self.prepare_local_env()?;
150
151        match self.action {
152            Action::Build => {
153                // 构建前的工作
154                self.pre_build()?;
155                // 构建任务
156                self.build()?;
157                // 构建完毕后的工作
158                self.post_build()?;
159            }
160            Action::Install => {
161                // 把构建结果安装到DragonOS
162                self.install()?;
163            }
164            Action::Clean(_) => {
165                // 清理构建结果
166                let r = self.clean();
167                if let Err(e) = r {
168                    error!(
169                        "Failed to clean task {}: {:?}",
170                        self.entity.task().name_version(),
171                        e
172                    );
173                }
174            }
175        }
176
177        return Ok(());
178    }
179
180    fn pre_build(&mut self) -> Result<(), ExecutorError> {
181        if let Some(pre_build) = self.entity.task().build.pre_build {
182            let output = Command::new(pre_build)
183                .output()
184                .expect("Failed to execute pre_build script");
185
186            // 检查脚本执行结果
187            if output.status.success() {
188                info!("Pre-build script executed successfully");
189            } else {
190                error!("Pre-build script failed");
191                return Err(ExecutorError::TaskFailed(
192                    "Pre-build script failed".to_string(),
193                ));
194            }
195        }
196        Ok(())
197    }
198
199    fn build(&mut self) -> Result<(), ExecutorError> {
200        if let Some(status) = self.task_log().build_status() {
201            if let Some(build_time) = self.task_log().build_time() {
202                let mut last_modified = last_modified_time(&self.entity.file_path(), build_time)?;
203                last_modified = core::cmp::max(
204                    last_modified,
205                    last_modified_time(&self.src_work_dir(), build_time)?,
206                );
207
208                if *status == BuildStatus::Success
209                    && (self.entity.task().build_once || last_modified < *build_time)
210                {
211                    info!(
212                        "Task {} has been built successfully, skip build.",
213                        self.entity.task().name_version()
214                    );
215                    return Ok(());
216                }
217            }
218        }
219
220        return self.do_build();
221    }
222
223    fn post_build(&mut self) -> Result<(), ExecutorError> {
224        if let Some(post_build) = self.entity.task().build.post_build {
225            let output = Command::new(post_build)
226                .output()
227                .expect("Failed to execute post_build script");
228
229            // 检查脚本执行结果
230            if output.status.success() {
231                info!("Post-build script executed successfully");
232            } else {
233                error!("Post-build script failed");
234                return Err(ExecutorError::TaskFailed(
235                    "Post-build script failed".to_string(),
236                ));
237            }
238        }
239        Ok(())
240    }
241
242    /// # 执行build操作
243    fn do_build(&mut self) -> Result<(), ExecutorError> {
244        // 确认源文件就绪
245        self.prepare_input()?;
246
247        let command: Option<Command> = self.create_command()?;
248        if let Some(cmd) = command {
249            self.run_command(cmd)?;
250        }
251
252        // 检查构建结果,如果为空,则抛出警告
253        if self.build_dir.is_empty()? {
254            warn!(
255                "Task {}: build result is empty, do you forget to copy the result to [$DADK_CURRENT_BUILD_DIR]?",
256                self.entity.task().name_version(),
257            );
258        }
259        return Ok(());
260    }
261
262    fn install(&self) -> Result<(), ExecutorError> {
263        log::trace!("dadk-user: install {}", self.entity.task().name_version());
264        if let Some(status) = self.task_log().install_status() {
265            if let Some(install_time) = self.task_log().install_time() {
266                let last_modified = last_modified_time(&self.build_dir.path, install_time)?;
267                let last_modified = core::cmp::max(
268                    last_modified,
269                    last_modified_time(&self.entity.file_path(), install_time)?,
270                );
271
272                if *status == InstallStatus::Success
273                    && (self.entity.task().install_once || last_modified < *install_time)
274                {
275                    info!(
276                        "install: Task {} not changed.",
277                        self.entity.task().name_version()
278                    );
279                    return Ok(());
280                }
281            }
282        }
283        log::trace!(
284            "dadk-user: to do install {}",
285            self.entity.task().name_version()
286        );
287        return self.do_install();
288    }
289
290    /// # 执行安装操作,把构建结果安装到DragonOS
291    fn do_install(&self) -> Result<(), ExecutorError> {
292        let binding = self.entity.task();
293        let in_dragonos_path = binding.install.in_dragonos_path.as_ref();
294        // 如果没有指定安装路径,则不执行安装
295        if in_dragonos_path.is_none() {
296            return Ok(());
297        }
298        info!("Installing task: {}", self.entity.task().name_version());
299        let mut in_dragonos_path = in_dragonos_path.unwrap().to_string_lossy().to_string();
300
301        debug!("in_dragonos_path: {}", in_dragonos_path);
302        // 去除开头的斜杠
303        {
304            let count_leading_slashes = in_dragonos_path.chars().take_while(|c| *c == '/').count();
305            in_dragonos_path = in_dragonos_path[count_leading_slashes..].to_string();
306        }
307        // 拼接最终的安装路径
308        let install_path = abs_path(&self.dragonos_sysroot.join(in_dragonos_path));
309        debug!("install_path: {:?}", install_path);
310        // 创建安装路径
311        std::fs::create_dir_all(&install_path).map_err(|e| {
312            ExecutorError::InstallError(format!("Failed to create install path: {}", e.to_string()))
313        })?;
314
315        // 拷贝构建结果到安装路径
316        let build_dir: PathBuf = self.build_dir.path.clone();
317        FileUtils::copy_dir_all(&build_dir, &install_path)
318            .map_err(|e| ExecutorError::InstallError(e))?;
319        info!("Task {} installed.", self.entity.task().name_version());
320
321        return Ok(());
322    }
323
324    fn clean(&self) -> Result<(), ExecutorError> {
325        let level = if let Action::Clean(l) = self.action {
326            l
327        } else {
328            panic!(
329                "BUG: clean() called with non-clean action. executor details: {:?}",
330                self
331            );
332        };
333        info!(
334            "Cleaning task: {}, level={level:?}",
335            self.entity.task().name_version()
336        );
337
338        let r: Result<(), ExecutorError> = match level {
339            UserCleanLevel::All => self.clean_all(),
340            UserCleanLevel::InSrc => self.clean_src(),
341            UserCleanLevel::Output => {
342                self.clean_target()?;
343                self.clean_cache()
344            }
345        };
346
347        if let Err(e) = r {
348            error!(
349                "Failed to clean task: {}, error message: {:?}",
350                self.entity.task().name_version(),
351                e
352            );
353            return Err(e);
354        }
355
356        return Ok(());
357    }
358
359    fn clean_all(&self) -> Result<(), ExecutorError> {
360        // 在源文件目录执行清理
361        self.clean_src()?;
362        // 清理构建结果
363        self.clean_target()?;
364        // 清理缓存
365        self.clean_cache()?;
366        return Ok(());
367    }
368
369    /// 在源文件目录执行清理
370    fn clean_src(&self) -> Result<(), ExecutorError> {
371        let cmd: Option<Command> = self.create_command()?;
372        if cmd.is_none() {
373            // 如果这里没有命令,则认为用户不需要在源文件目录执行清理
374            return Ok(());
375        }
376        info!(
377            "{}: Cleaning in source directory: {:?}",
378            self.entity.task().name_version(),
379            self.src_work_dir()
380        );
381
382        let cmd = cmd.unwrap();
383        self.run_command(cmd)?;
384        return Ok(());
385    }
386
387    /// 清理构建输出目录
388    fn clean_target(&self) -> Result<(), ExecutorError> {
389        info!(
390            "{}: Cleaning build target directory: {:?}",
391            self.entity.task().name_version(),
392            self.build_dir.path
393        );
394
395        return self.build_dir.remove_self_recursive();
396    }
397
398    /// 清理下载缓存
399    fn clean_cache(&self) -> Result<(), ExecutorError> {
400        let cache_dir = self.source_dir.as_ref();
401        if cache_dir.is_none() {
402            // 如果没有缓存目录,则认为用户不需要清理缓存
403            return Ok(());
404        }
405        info!(
406            "{}: Cleaning cache directory: {}",
407            self.entity.task().name_version(),
408            self.src_work_dir().display()
409        );
410        return cache_dir.unwrap().remove_self_recursive();
411    }
412
413    /// 获取源文件的工作目录
414    fn src_work_dir(&self) -> PathBuf {
415        if let Some(local_path) = self.entity.task().source_path() {
416            return local_path;
417        }
418        return self.source_dir.as_ref().unwrap().path.clone();
419    }
420
421    fn task_log(&self) -> TaskLog {
422        return self.task_data_dir.task_log();
423    }
424
425    /// 为任务创建命令
426    fn create_command(&self) -> Result<Option<Command>, ExecutorError> {
427        // 获取命令
428        let raw_cmd = match self.entity.task().task_type {
429            TaskType::BuildFromSource(_) => match self.action {
430                Action::Build => self.entity.task().build.build_command.clone(),
431                Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
432                _ => unimplemented!(
433                    "create_command: Action {:?} not supported yet.",
434                    self.action
435                ),
436            },
437
438            TaskType::InstallFromPrebuilt(_) => match self.action {
439                Action::Build => self.entity.task().build.build_command.clone(),
440                Action::Clean(_) => self.entity.task().clean.clean_command.clone(),
441                _ => unimplemented!(
442                    "create_command: Action {:?} not supported yet.",
443                    self.action
444                ),
445            },
446        };
447
448        if raw_cmd.is_none() {
449            return Ok(None);
450        }
451
452        let raw_cmd = raw_cmd.unwrap();
453
454        let mut command = Command::new("bash");
455        command.current_dir(self.src_work_dir());
456
457        // 设置参数
458        command.arg("-c");
459        command.arg(raw_cmd);
460
461        // 设置环境变量
462        let env_list = ENV_LIST.read().unwrap();
463        for (key, value) in env_list.envs.iter() {
464            // if key.starts_with("DADK") {
465            //     debug!("DADK env found: {}={}", key, value.value);
466            // }
467            command.env(key, value.value.clone());
468        }
469        drop(env_list);
470        for (key, value) in self.local_envs.envs.iter() {
471            debug!("Local env found: {}={}", key, value.value);
472            command.env(key, value.value.clone());
473        }
474
475        return Ok(Some(command));
476    }
477
478    /// # 准备工作线程本地环境变量
479    fn prepare_local_env(&mut self) -> Result<(), ExecutorError> {
480        let binding = self.entity.task();
481        let task_envs: Option<&Vec<TaskEnv>> = binding.envs.as_ref();
482
483        if let Some(task_envs) = task_envs {
484            for tv in task_envs.iter() {
485                self.local_envs
486                    .add(EnvVar::new(tv.key().to_string(), tv.value().to_string()));
487            }
488        }
489
490        // 添加`DADK_CURRENT_BUILD_DIR`环境变量,便于构建脚本把构建结果拷贝到这里
491        self.local_envs.add(EnvVar::new(
492            "DADK_CURRENT_BUILD_DIR".to_string(),
493            self.build_dir.path.to_str().unwrap().to_string(),
494        ));
495
496        return Ok(());
497    }
498
499    fn prepare_input(&self) -> Result<(), ExecutorError> {
500        // 拉取源文件
501        let task = self.entity.task();
502        match &task.task_type {
503            TaskType::BuildFromSource(cs) => {
504                if self.source_dir.is_none() {
505                    return Ok(());
506                }
507                let source_dir = self.source_dir.as_ref().unwrap();
508                match cs {
509                    CodeSource::Git(git) => {
510                        git.prepare(source_dir)
511                            .map_err(|e| ExecutorError::PrepareEnvError(e))?;
512                    }
513                    // 本地源文件,不需要拉取
514                    CodeSource::Local(_) => return Ok(()),
515                    // 在线压缩包,需要下载
516                    CodeSource::Archive(archive) => {
517                        archive
518                            .download_unzip(source_dir)
519                            .map_err(|e| ExecutorError::PrepareEnvError(e))?;
520                    }
521                }
522            }
523            TaskType::InstallFromPrebuilt(pb) => {
524                match pb {
525                    // 本地源文件,不需要拉取
526                    PrebuiltSource::Local(local_source) => {
527                        let local_path = local_source.path();
528                        let target_path = &self.build_dir.path;
529                        FileUtils::copy_dir_all(&local_path, &target_path)
530                            .map_err(|e| ExecutorError::TaskFailed(e))?; // let mut cmd = "cp -r ".to_string();
531                        return Ok(());
532                    }
533                    // 在线压缩包,需要下载
534                    PrebuiltSource::Archive(archive) => {
535                        archive
536                            .download_unzip(&self.build_dir)
537                            .map_err(|e| ExecutorError::PrepareEnvError(e))?;
538                    }
539                }
540            }
541        }
542
543        return Ok(());
544    }
545
546    fn run_command(&self, mut command: Command) -> Result<(), ExecutorError> {
547        let mut child = command
548            .stdin(Stdio::inherit())
549            .spawn()
550            .map_err(|e| ExecutorError::IoError(e.to_string()))?;
551
552        // 等待子进程结束
553        let r = child
554            .wait()
555            .map_err(|e| ExecutorError::IoError(e.to_string()));
556        debug!("Command finished: {:?}", r);
557        if r.is_ok() {
558            let r = r.unwrap();
559            if r.success() {
560                return Ok(());
561            } else {
562                // 执行失败,获取最后100行stderr输出
563                let errmsg = format!(
564                    "Task {} failed, exit code = {}",
565                    self.entity.task().name_version(),
566                    r.code().unwrap()
567                );
568                error!("{errmsg}");
569                let command_opt = command.output();
570                if command_opt.is_err() {
571                    return Err(ExecutorError::TaskFailed(
572                        "Failed to get command output".to_string(),
573                    ));
574                }
575                let command_opt = command_opt.unwrap();
576                let command_output = String::from_utf8_lossy(&command_opt.stderr);
577                let mut last_100_outputs = command_output
578                    .lines()
579                    .rev()
580                    .take(100)
581                    .collect::<Vec<&str>>();
582                last_100_outputs.reverse();
583                error!("Last 100 lines msg of stderr:");
584                for line in last_100_outputs {
585                    error!("{}", line);
586                }
587                return Err(ExecutorError::TaskFailed(errmsg));
588            }
589        } else {
590            let errmsg = format!(
591                "Task {} failed, msg = {:?}",
592                self.entity.task().name_version(),
593                r.err().unwrap()
594            );
595            error!("{errmsg}");
596            return Err(ExecutorError::TaskFailed(errmsg));
597        }
598    }
599}
600
601#[derive(Debug, Clone)]
602pub struct EnvMap {
603    pub envs: BTreeMap<String, EnvVar>,
604}
605
606impl EnvMap {
607    pub fn new() -> Self {
608        Self {
609            envs: BTreeMap::new(),
610        }
611    }
612
613    pub fn add(&mut self, env: EnvVar) {
614        self.envs.insert(env.key.clone(), env);
615    }
616
617    #[allow(dead_code)]
618    pub fn get(&self, key: &str) -> Option<&EnvVar> {
619        self.envs.get(key)
620    }
621
622    pub fn add_vars(&mut self, vars: Vars) {
623        for (key, value) in vars {
624            self.add(EnvVar::new(key, value));
625        }
626    }
627}
628
629/// # 环境变量
630#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Clone)]
631pub struct EnvVar {
632    pub key: String,
633    pub value: String,
634}
635
636impl EnvVar {
637    pub fn new(key: String, value: String) -> Self {
638        Self { key, value }
639    }
640}
641
642/// # 任务执行器错误枚举
643#[allow(dead_code)]
644#[derive(Debug, Clone)]
645pub enum ExecutorError {
646    /// 准备执行环境错误
647    PrepareEnvError(String),
648    IoError(String),
649    /// 构建执行错误
650    TaskFailed(String),
651    /// 安装错误
652    InstallError(String),
653    /// 清理错误
654    CleanError(String),
655}
656
657/// # 准备全局环境变量
658pub fn prepare_env(
659    sched_entities: &SchedEntities,
660    execute_ctx: &Arc<DadkUserExecuteContext>,
661) -> Result<(), ExecutorError> {
662    info!("Preparing environment variables...");
663    let env_list = create_global_env_list(sched_entities, execute_ctx)?;
664    // 写入全局环境变量列表
665    let mut global_env_list = ENV_LIST.write().unwrap();
666    *global_env_list = env_list;
667    return Ok(());
668}
669
670/// # 创建全局环境变量列表
671fn create_global_env_list(
672    sched_entities: &SchedEntities,
673    execute_ctx: &Arc<DadkUserExecuteContext>,
674) -> Result<EnvMap, ExecutorError> {
675    let mut env_list = EnvMap::new();
676    let envs: Vars = std::env::vars();
677    env_list.add_vars(envs);
678
679    // 为每个任务创建特定的环境变量
680    for entity in sched_entities.entities().iter() {
681        // 导出任务的构建目录环境变量
682        let build_dir = CacheDir::build_dir(entity.clone())?;
683
684        let build_dir_key = CacheDir::build_dir_env_key(&entity)?;
685        env_list.add(EnvVar::new(
686            build_dir_key,
687            build_dir.to_str().unwrap().to_string(),
688        ));
689
690        // 如果需要源码缓存目录,则导出
691        if CacheDir::need_source_cache(entity) {
692            let source_dir = CacheDir::source_dir(entity.clone())?;
693            let source_dir_key = CacheDir::source_dir_env_key(&entity)?;
694            env_list.add(EnvVar::new(
695                source_dir_key,
696                source_dir.to_str().unwrap().to_string(),
697            ));
698        }
699    }
700
701    // 创建ARCH环境变量
702    let target_arch = execute_ctx.target_arch();
703    env_list.add(EnvVar::new("ARCH".to_string(), (*target_arch).into()));
704
705    return Ok(env_list);
706}
707
708/// # 获取文件最后的更新时间
709///
710/// ## 参数
711/// * `path` - 文件路径
712/// * `last_modified` - 最后的更新时间
713/// * `build_time` - 构建时间
714fn last_modified_time(
715    path: &PathBuf,
716    build_time: &DateTime<Utc>,
717) -> Result<DateTime<Utc>, ExecutorError> {
718    let mut queue = VecDeque::new();
719    queue.push_back(path.clone());
720
721    let mut last_modified = DateTime::<Utc>::from(SystemTime::UNIX_EPOCH);
722
723    while let Some(current_path) = queue.pop_front() {
724        let metadata = current_path
725            .metadata()
726            .map_err(|e| ExecutorError::InstallError(e.to_string()))?;
727
728        if metadata.is_dir() {
729            for r in std::fs::read_dir(&current_path).unwrap() {
730                if let Ok(entry) = r {
731                    // 忽略编译产物目录
732                    if entry.file_name() == "target" {
733                        continue;
734                    }
735
736                    let entry_path = entry.path();
737                    let entry_metadata = entry.metadata().unwrap();
738                    // 比较文件的修改时间和last_modified,取最大值
739                    let file_modified = DateTime::<Utc>::from(entry_metadata.modified().unwrap());
740                    last_modified = std::cmp::max(last_modified, file_modified);
741
742                    // 如果其中某一个文件的修改时间在build_time之后,则直接返回,不用继续搜索
743                    if last_modified > *build_time {
744                        return Ok(last_modified);
745                    }
746
747                    if entry_metadata.is_dir() {
748                        // 如果是子目录,则将其加入队列
749                        queue.push_back(entry_path);
750                    }
751                }
752            }
753        } else {
754            // 如果是文件,直接比较修改时间
755            let file_modified = DateTime::<Utc>::from(metadata.modified().unwrap());
756            last_modified = std::cmp::max(last_modified, file_modified);
757
758            // 如果其中某一个文件的修改时间在build_time之后,则直接返回,不用继续递归
759            if last_modified > *build_time {
760                return Ok(last_modified);
761            }
762        }
763    }
764
765    if last_modified == DateTime::<Utc>::from(SystemTime::UNIX_EPOCH) {
766        return Err(ExecutorError::InstallError(format!(
767            "Failed to get last modified time for path: {}",
768            path.display()
769        )));
770    }
771    Ok(last_modified)
772}