Skip to main content

opal/executor/
core.rs

1mod history_store;
2mod launch;
3mod lifecycle;
4mod preparer;
5mod process;
6mod registry;
7mod runtime_state;
8mod runtime_summary;
9mod stage_tracker;
10mod workspace;
11
12use super::{orchestrator, paths};
13use crate::compiler::compile_pipeline;
14use crate::display::{self, DisplayFormatter, collect_pipeline_plan, print_pipeline_summary};
15use crate::env::{build_job_env, collect_env_vars, expand_env_list};
16use crate::execution_plan::{ExecutableJob, ExecutionPlan, build_execution_plan};
17use crate::history::{HistoryCache, HistoryEntry};
18use crate::logging;
19use crate::model::{ArtifactSourceOutcome, CachePolicySpec, JobSpec, PipelineSpec};
20use crate::naming::generate_run_id;
21use crate::pipeline::{
22    self, ArtifactManager, CacheManager, ExternalArtifactsManager, JobRunInfo, JobSummary,
23    RuleContext,
24};
25use crate::runner::ExecuteContext;
26use crate::secrets::SecretsStore;
27use crate::terminal::should_use_color;
28use crate::ui::{UiBridge, UiHandle, UiJobInfo, UiJobResources};
29use crate::{ExecutorConfig, runtime};
30use anyhow::{Context, Result};
31use std::collections::HashMap;
32use std::env;
33use std::fs;
34use std::path::{Path, PathBuf};
35use std::sync::Arc;
36use tokio::sync::mpsc;
37
38pub(super) const CONTAINER_ROOT: &str = "/builds";
39
40#[derive(Debug, Clone)]
41pub struct ExecutorCore {
42    pub config: ExecutorConfig,
43    pipeline: PipelineSpec,
44    use_color: bool,
45    scripts_dir: PathBuf,
46    logs_dir: PathBuf,
47    session_dir: PathBuf,
48    container_session_dir: PathBuf,
49    run_id: String,
50    verbose_scripts: bool,
51    env_vars: Vec<(String, String)>,
52    shared_env: HashMap<String, String>,
53    container_workdir: PathBuf,
54    stage_tracker: stage_tracker::StageTracker,
55    runtime_state: runtime_state::RuntimeState,
56    history_store: history_store::HistoryStore,
57    secrets: SecretsStore,
58    artifacts: ArtifactManager,
59    cache: CacheManager,
60    external_artifacts: Option<ExternalArtifactsManager>,
61}
62
63#[derive(Debug, Clone)]
64struct JobResourceInfo {
65    artifact_dir: Option<String>,
66    artifact_paths: Vec<String>,
67    caches: Vec<HistoryCache>,
68    container_name: Option<String>,
69    service_network: Option<String>,
70    service_containers: Vec<String>,
71    runtime_summary_path: Option<String>,
72}
73
74impl ExecutorCore {
75    // TODO: this shit does way too much, hard to test if you add fs::create inside of it
76    pub fn new(config: ExecutorConfig) -> Result<Self> {
77        let pipeline =
78            PipelineSpec::from_path_with_gitlab(&config.pipeline, config.gitlab.as_ref())?;
79        let run_id = generate_run_id(&config);
80        let runs_root = runtime::runs_root();
81        fs::create_dir_all(&runs_root)
82            .with_context(|| format!("failed to create {:?}", runs_root))?;
83
84        let session_dir = runtime::session_dir(&run_id);
85        if session_dir.exists() {
86            fs::remove_dir_all(&session_dir)
87                .with_context(|| format!("failed to clean {:?}", session_dir))?;
88        }
89        fs::create_dir_all(&session_dir)
90            .with_context(|| format!("failed to create {:?}", session_dir))?;
91
92        let scripts_dir = session_dir.join("scripts");
93        fs::create_dir_all(&scripts_dir)
94            .with_context(|| format!("failed to create {:?}", scripts_dir))?;
95
96        let logs_dir = runtime::logs_dir(&run_id);
97        fs::create_dir_all(&logs_dir)
98            .with_context(|| format!("failed to create {:?}", logs_dir))?;
99
100        let history_store = history_store::HistoryStore::load(runtime::history_path());
101
102        let use_color = should_use_color();
103        let env_verbose = env::var_os("OPAL_DEBUG")
104            .map(|val| {
105                let s = val.to_string_lossy();
106                s == "1" || s.eq_ignore_ascii_case("true")
107            })
108            .unwrap_or(false);
109        let verbose_scripts = config.trace_scripts || env_verbose;
110        let mut env_vars = collect_env_vars(&config.env_includes)?;
111        let mut shared_env: HashMap<String, String> = env::vars().collect();
112        expand_env_list(&mut env_vars[..], &shared_env);
113        shared_env.extend(env_vars.iter().cloned());
114        let stage_specs: Vec<(String, usize)> = pipeline
115            .stages
116            .iter()
117            .map(|stage| (stage.name.clone(), stage.jobs.len()))
118            .collect();
119        let stage_tracker = stage_tracker::StageTracker::new(&stage_specs);
120
121        let secrets = SecretsStore::load(&config.workdir)?;
122        shared_env.extend(secrets.env_pairs());
123        let artifacts = ArtifactManager::new(session_dir.clone());
124        let cache_root = runtime::cache_root();
125        fs::create_dir_all(&cache_root)
126            .with_context(|| format!("failed to create cache root {:?}", cache_root))?;
127        let cache = CacheManager::new(cache_root);
128        let external_artifacts = config.gitlab.as_ref().map(|cfg| {
129            ExternalArtifactsManager::new(
130                session_dir.clone(),
131                cfg.base_url.clone(),
132                cfg.token.clone(),
133            )
134        });
135        let project_dir = config
136            .workdir
137            .file_name()
138            .and_then(|n| n.to_str())
139            .unwrap_or("project");
140        let container_workdir = Path::new(CONTAINER_ROOT).join(project_dir);
141        let container_session_dir = Path::new("/opal").join(&run_id);
142
143        let core = Self {
144            config,
145            pipeline,
146            use_color,
147            scripts_dir,
148            logs_dir,
149            session_dir,
150            container_session_dir,
151            run_id,
152            verbose_scripts,
153            env_vars,
154            shared_env,
155            container_workdir,
156            stage_tracker,
157            runtime_state: runtime_state::RuntimeState::default(),
158            history_store,
159            secrets,
160            artifacts,
161            cache,
162            external_artifacts,
163        };
164
165        registry::ensure_registry_logins(&core)?;
166
167        Ok(core)
168    }
169
170    pub async fn run(&self) -> Result<()> {
171        let plan = Arc::new(self.plan_jobs()?);
172        let resource_map = self.collect_job_resources(&plan);
173        let display = self.display();
174        let plan_text = collect_pipeline_plan(&display, &plan).join("\n");
175        let ui_resources = Self::convert_ui_resources(&resource_map);
176        let history_snapshot = self.history_store.snapshot();
177        let ui_handle = if self.config.enable_tui {
178            let jobs: Vec<UiJobInfo> = plan
179                .ordered
180                .iter()
181                .filter_map(|name| plan.nodes.get(name))
182                .map(|planned| UiJobInfo {
183                    name: planned.instance.job.name.clone(),
184                    stage: planned.instance.stage_name.clone(),
185                    log_path: planned.log_path.clone(),
186                    log_hash: planned.log_hash.clone(),
187                })
188                .collect();
189            Some(UiHandle::start(
190                jobs,
191                history_snapshot,
192                self.run_id.clone(),
193                ui_resources,
194                plan_text,
195                self.config.workdir.clone(),
196            )?)
197        } else {
198            None
199        };
200        let mut owned_command_rx = if !self.config.enable_tui {
201            let (tx, rx) = mpsc::unbounded_channel();
202            if let Ok(raw) = env::var("OPAL_ABORT_AFTER_SECS")
203                && let Ok(seconds) = raw.parse::<u64>()
204                && seconds > 0
205            {
206                tokio::spawn(async move {
207                    tokio::time::sleep(std::time::Duration::from_secs(seconds)).await;
208                    let _ = tx.send(crate::ui::UiCommand::AbortPipeline);
209                });
210            }
211            Some(rx)
212        } else {
213            None
214        };
215        let mut ui_command_rx = ui_handle
216            .as_ref()
217            .and_then(|handle| handle.command_receiver());
218        let command_rx = ui_command_rx.as_mut().or(owned_command_rx.as_mut());
219        let ui_bridge = ui_handle.as_ref().map(|handle| Arc::new(handle.bridge()));
220
221        let (mut summaries, result) =
222            orchestrator::execute_plan(self, plan.clone(), ui_bridge.clone(), command_rx).await;
223
224        if let Some(handle) = &ui_handle {
225            handle.pipeline_finished();
226        }
227
228        if let Some(commands) = ui_command_rx.as_mut() {
229            orchestrator::handle_restart_commands(
230                self,
231                plan.clone(),
232                ui_bridge.clone(),
233                commands,
234                &mut summaries,
235            )
236            .await?;
237        }
238
239        let history_entry = self.record_pipeline_history(&summaries, &resource_map);
240        if let (Some(entry), Some(ui)) = (history_entry, ui_bridge.as_deref()) {
241            ui.history_updated(entry);
242        }
243
244        if let Some(handle) = ui_handle {
245            handle.wait_for_exit();
246        }
247
248        if !self.config.enable_tui {
249            print_pipeline_summary(
250                &display,
251                &plan,
252                &summaries,
253                &self.session_dir,
254                display::print_line,
255            );
256        }
257        result
258    }
259
260    fn plan_jobs(&self) -> Result<ExecutionPlan> {
261        let ctx = RuleContext::new(&self.config.workdir);
262        ctx.ensure_valid_tag_context()?;
263        if !pipeline::rules::filters_allow(&self.pipeline.filters, &ctx) {
264            return Ok(empty_execution_plan());
265        }
266        if let Some(workflow) = &self.pipeline.workflow
267            && !pipeline::rules::evaluate_workflow(&workflow.rules, &ctx)?
268        {
269            return Ok(empty_execution_plan());
270        }
271        let compiled = compile_pipeline(&self.pipeline, Some(&ctx))?;
272        let mut plan = build_execution_plan(compiled, |job| self.job_log_info(job))?;
273        if !self.config.selected_jobs.is_empty() {
274            plan = plan.select_jobs(&self.config.selected_jobs)?;
275        }
276        Ok(plan)
277    }
278
279    fn collect_job_resources(&self, plan: &ExecutionPlan) -> HashMap<String, JobResourceInfo> {
280        plan.nodes
281            .values()
282            .map(|planned| {
283                // TODO: 50 lines of code inside a map, when this explodes, what happens - refactor
284                let artifact_dir = if planned.instance.job.artifacts.paths.is_empty() {
285                    None
286                } else {
287                    Some(
288                        self.artifacts
289                            .job_artifacts_root(&planned.instance.job.name)
290                            .display()
291                            .to_string(),
292                    )
293                };
294                let artifact_paths = planned
295                    .instance
296                    .job
297                    .artifacts
298                    .paths
299                    .iter()
300                    .map(|path| path.display().to_string())
301                    .collect();
302                let env_vars = self.job_env(&planned.instance.job);
303                let cache_env: HashMap<String, String> = env_vars.iter().cloned().collect();
304                let caches = self
305                    .cache
306                    .describe_entries(
307                        &planned.instance.job.cache,
308                        &self.config.workdir,
309                        &cache_env,
310                    )
311                    .into_iter()
312                    .map(|entry| HistoryCache {
313                        key: entry.key,
314                        policy: cache_policy_label(entry.policy).to_string(),
315                        host: entry.host.display().to_string(),
316                        paths: entry
317                            .paths
318                            .iter()
319                            .map(|path| path.display().to_string())
320                            .collect(),
321                    })
322                    .collect();
323                (
324                    planned.instance.job.name.clone(),
325                    JobResourceInfo {
326                        artifact_dir,
327                        artifact_paths,
328                        caches,
329                        container_name: None,
330                        service_network: None,
331                        service_containers: Vec::new(),
332                        runtime_summary_path: None,
333                    },
334                )
335            })
336            .collect()
337    }
338
339    fn convert_ui_resources(
340        resources: &HashMap<String, JobResourceInfo>,
341    ) -> HashMap<String, UiJobResources> {
342        resources
343            .iter()
344            .map(|(name, info)| {
345                (
346                    name.clone(),
347                    UiJobResources {
348                        artifact_dir: info.artifact_dir.clone(),
349                        artifact_paths: info.artifact_paths.clone(),
350                        caches: info.caches.clone(),
351                        container_name: info.container_name.clone(),
352                        service_network: info.service_network.clone(),
353                        service_containers: info.service_containers.clone(),
354                        runtime_summary_path: info.runtime_summary_path.clone(),
355                    },
356                )
357            })
358            .collect()
359    }
360
361    fn record_pipeline_history(
362        &self,
363        summaries: &[JobSummary],
364        resources: &HashMap<String, JobResourceInfo>,
365    ) -> Option<HistoryEntry> {
366        let resource_map = resources
367            .iter()
368            .map(|(name, info)| {
369                let runtime = self.runtime_state.runtime_objects(name);
370                (
371                    name.clone(),
372                    history_store::HistoryResources {
373                        artifact_dir: info.artifact_dir.clone(),
374                        artifacts: info.artifact_paths.clone(),
375                        caches: info.caches.clone(),
376                        container_name: runtime
377                            .as_ref()
378                            .and_then(|objects| objects.container_name.clone())
379                            .or_else(|| info.container_name.clone()),
380                        service_network: runtime
381                            .as_ref()
382                            .and_then(|objects| objects.service_network.clone())
383                            .or_else(|| info.service_network.clone()),
384                        service_containers: runtime
385                            .as_ref()
386                            .map(|objects| objects.service_containers.clone())
387                            .unwrap_or_else(|| info.service_containers.clone()),
388                        runtime_summary_path: runtime
389                            .as_ref()
390                            .and_then(|objects| objects.runtime_summary_path.clone())
391                            .or_else(|| info.runtime_summary_path.clone()),
392                    },
393                )
394            })
395            .collect();
396        self.history_store
397            .record(&self.run_id, summaries, &resource_map)
398    }
399
400    pub(crate) fn record_runtime_objects(
401        &self,
402        job_name: &str,
403        container_name: String,
404        service_network: Option<String>,
405        service_containers: Vec<String>,
406        runtime_summary_path: Option<String>,
407    ) {
408        self.runtime_state.record_runtime_objects(
409            job_name,
410            container_name,
411            service_network,
412            service_containers,
413            runtime_summary_path,
414        );
415    }
416
417    pub(crate) fn write_runtime_summary(
418        &self,
419        job_name: &str,
420        container_name: &str,
421        service_network: Option<&str>,
422        service_containers: &[String],
423    ) -> Result<Option<String>> {
424        runtime_summary::write_runtime_summary(
425            self,
426            job_name,
427            container_name,
428            service_network,
429            service_containers,
430        )
431    }
432
433    pub(crate) fn log_job_start(
434        &self,
435        planned: &ExecutableJob,
436        ui: Option<&UiBridge>,
437    ) -> Result<JobRunInfo> {
438        launch::log_job_start(self, planned, ui)
439    }
440
441    pub(crate) fn prepare_job_run(
442        &self,
443        plan: &ExecutionPlan,
444        job: &JobSpec,
445    ) -> Result<preparer::PreparedJobRun> {
446        preparer::prepare_job_run(self, plan, job)
447    }
448
449    pub(crate) fn collect_untracked_artifacts(
450        &self,
451        job: &JobSpec,
452        workspace: &Path,
453    ) -> Result<()> {
454        self.artifacts.collect_untracked(job, workspace)
455    }
456
457    pub(crate) fn collect_declared_artifacts(&self, job: &JobSpec, workspace: &Path) -> Result<()> {
458        self.artifacts.collect_declared(job, workspace)
459    }
460
461    pub(crate) fn collect_dotenv_artifacts(&self, job: &JobSpec, workspace: &Path) -> Result<()> {
462        self.artifacts.collect_dotenv_report(job, workspace)
463    }
464
465    pub(crate) fn clear_running_container(&self, job_name: &str) {
466        self.runtime_state.clear_running_container(job_name);
467    }
468
469    pub(crate) fn record_completed_job(&self, job_name: &str, outcome: ArtifactSourceOutcome) {
470        self.runtime_state.record_completed_job(job_name, outcome);
471    }
472
473    pub(crate) fn completed_jobs(&self) -> HashMap<String, ArtifactSourceOutcome> {
474        self.runtime_state.completed_jobs()
475    }
476
477    pub(crate) fn take_cancelled_job(&self, job_name: &str) -> bool {
478        self.runtime_state.take_cancelled_job(job_name)
479    }
480
481    pub(crate) fn cancel_running_job(&self, job_name: &str) -> bool {
482        let container = self.runtime_state.running_container(job_name);
483        if let Some(container_name) = container {
484            self.runtime_state.mark_job_cancelled(job_name);
485            self.kill_container(job_name, &container_name);
486            true
487        } else {
488            false
489        }
490    }
491
492    pub(crate) fn execute(&self, ctx: ExecuteContext<'_>) -> Result<()> {
493        process::execute(self, ctx)
494    }
495
496    pub(crate) fn print_job_completion(
497        &self,
498        stage_name: &str,
499        script_path: &Path,
500        log_path: &Path,
501        elapsed: f32,
502    ) {
503        if !self.config.enable_tui {
504            let display = self.display();
505            display::print_line(format!("    script stored at {}", script_path.display()));
506            display::print_line(format!("    log file stored at {}", log_path.display()));
507            let finish_label = display.bold_green("    ✓ finished in");
508            display::print_line(format!("{} {:.2}s", finish_label, elapsed));
509
510            if let Some(stage_elapsed) = self.stage_tracker.complete_job(stage_name) {
511                let stage_footer = display.bold_blue("╰─ stage complete in");
512                display::print_line(format!("{stage_footer} {:.2}s", stage_elapsed));
513            }
514        }
515    }
516
517    pub(crate) fn kill_container(&self, job_name: &str, container_name: &str) {
518        lifecycle::kill_container(self, job_name, container_name);
519    }
520
521    pub(crate) fn cleanup_finished_container(&self, container_name: &str) {
522        lifecycle::cleanup_finished_container(self, container_name);
523    }
524
525    fn resolve_job_image_with_env(
526        &self,
527        job: &JobSpec,
528        env_lookup: Option<&HashMap<String, String>>,
529    ) -> Result<crate::model::ImageSpec> {
530        launch::resolve_job_image_with_env(self, job, env_lookup)
531    }
532
533    fn job_env(&self, job: &JobSpec) -> Vec<(String, String)> {
534        build_job_env(
535            &self.env_vars,
536            &self.pipeline.defaults.variables,
537            job,
538            &self.secrets,
539            &self.config.workdir,
540            &self.container_workdir,
541            Path::new(CONTAINER_ROOT),
542            &self.run_id,
543            &self.shared_env,
544        )
545    }
546
547    pub(crate) fn expanded_environment(
548        &self,
549        job: &JobSpec,
550    ) -> Option<crate::model::EnvironmentSpec> {
551        let environment = job.environment.as_ref()?;
552        let lookup: HashMap<String, String> = self.job_env(job).into_iter().collect();
553        Some(crate::env::expand_environment(environment, &lookup))
554    }
555
556    fn display(&self) -> DisplayFormatter {
557        DisplayFormatter::new(self.use_color)
558    }
559
560    fn job_log_info(&self, job: &JobSpec) -> (PathBuf, String) {
561        logging::job_log_info(&self.logs_dir, &self.run_id, job)
562    }
563
564    fn container_path_rel(&self, host_path: &Path) -> Result<PathBuf> {
565        paths::to_container_path(
566            host_path,
567            &[
568                (&*self.session_dir, &*self.container_session_dir),
569                (&*self.config.workdir, &*self.container_workdir),
570            ],
571        )
572    }
573}
574
575fn empty_execution_plan() -> ExecutionPlan {
576    ExecutionPlan {
577        ordered: Vec::new(),
578        nodes: HashMap::new(),
579        dependents: HashMap::new(),
580        order_index: HashMap::new(),
581        variants: HashMap::new(),
582    }
583}
584
585fn cache_policy_label(policy: CachePolicySpec) -> &'static str {
586    match policy {
587        CachePolicySpec::Pull => "pull",
588        CachePolicySpec::Push => "push",
589        CachePolicySpec::PullPush => "pull-push",
590    }
591}
592
593#[cfg(test)]
594mod tests {
595    // ExecutorCore-specific unit coverage lives in child modules while phase 3 extraction continues.
596}