Skip to main content

opal/executor/
core.rs

1mod history_store;
2mod launch;
3mod lifecycle;
4mod preparer;
5mod process;
6mod registry;
7mod runtime_state;
8mod runtime_summary;
9mod stage_tracker;
10mod workspace;
11
12use super::{orchestrator, paths};
13use crate::ai::{self, AiContext, AiProviderKind, AiRequest, render_job_analysis_prompt};
14use crate::compiler::compile_pipeline;
15use crate::display::{self, DisplayFormatter, collect_pipeline_plan, print_pipeline_summary};
16use crate::env::{build_job_env, collect_env_vars, expand_env_list};
17use crate::execution_plan::{ExecutableJob, ExecutionPlan, build_execution_plan};
18use crate::executor::container_arch::{container_arch_from_platform, normalize_container_arch};
19use crate::history::{HistoryCache, HistoryEntry};
20use crate::logging;
21use crate::model::{ArtifactSourceOutcome, CachePolicySpec, JobSpec, PipelineSpec};
22use crate::naming::{generate_run_id, job_name_slug};
23use crate::pipeline::{
24    self, ArtifactManager, CacheManager, ExternalArtifactsManager, JobRunInfo, JobSummary,
25    RuleContext,
26};
27use crate::runner::ExecuteContext;
28use crate::secrets::SecretsStore;
29use crate::terminal::should_use_color;
30use crate::ui::{UiBridge, UiHandle, UiJobInfo, UiJobResources};
31use crate::{EngineKind, ExecutorConfig, runtime};
32use anyhow::{Context, Result};
33use std::collections::{BTreeSet, HashMap};
34use std::env;
35use std::fs;
36use std::path::{Path, PathBuf};
37use std::sync::Arc;
38use tokio::sync::mpsc;
39
40pub(super) const CONTAINER_ROOT: &str = "/builds";
41
42#[derive(Debug, Clone)]
43pub struct ExecutorCore {
44    pub config: ExecutorConfig,
45    pipeline: PipelineSpec,
46    use_color: bool,
47    scripts_dir: PathBuf,
48    logs_dir: PathBuf,
49    session_dir: PathBuf,
50    container_session_dir: PathBuf,
51    run_id: String,
52    verbose_scripts: bool,
53    env_vars: Vec<(String, String)>,
54    shared_env: HashMap<String, String>,
55    container_workdir: PathBuf,
56    stage_tracker: stage_tracker::StageTracker,
57    runtime_state: runtime_state::RuntimeState,
58    history_store: history_store::HistoryStore,
59    secrets: SecretsStore,
60    artifacts: ArtifactManager,
61    cache: CacheManager,
62    external_artifacts: Option<ExternalArtifactsManager>,
63}
64
65#[derive(Debug, Clone)]
66struct JobResourceInfo {
67    artifact_dir: Option<String>,
68    artifact_paths: Vec<String>,
69    caches: Vec<HistoryCache>,
70    container_name: Option<String>,
71    service_network: Option<String>,
72    service_containers: Vec<String>,
73    runtime_summary_path: Option<String>,
74    env_vars: Vec<String>,
75}
76
77impl ExecutorCore {
78    // TODO: this shit does way too much, hard to test if you add fs::create inside of it
79    pub fn new(config: ExecutorConfig) -> Result<Self> {
80        let pipeline =
81            PipelineSpec::from_path_with_gitlab(&config.pipeline, config.gitlab.as_ref())?;
82        let run_id = generate_run_id(&config);
83        let runs_root = runtime::runs_root();
84        fs::create_dir_all(&runs_root)
85            .with_context(|| format!("failed to create {:?}", runs_root))?;
86
87        let session_dir = runtime::session_dir(&run_id);
88        if session_dir.exists() {
89            fs::remove_dir_all(&session_dir)
90                .with_context(|| format!("failed to clean {:?}", session_dir))?;
91        }
92        fs::create_dir_all(&session_dir)
93            .with_context(|| format!("failed to create {:?}", session_dir))?;
94
95        let scripts_dir = session_dir.join("scripts");
96        fs::create_dir_all(&scripts_dir)
97            .with_context(|| format!("failed to create {:?}", scripts_dir))?;
98
99        let logs_dir = runtime::logs_dir(&run_id);
100        fs::create_dir_all(&logs_dir)
101            .with_context(|| format!("failed to create {:?}", logs_dir))?;
102
103        let history_store = history_store::HistoryStore::load(runtime::history_path());
104
105        let use_color = should_use_color();
106        let env_verbose = env::var_os("OPAL_DEBUG")
107            .map(|val| {
108                let s = val.to_string_lossy();
109                s == "1" || s.eq_ignore_ascii_case("true")
110            })
111            .unwrap_or(false);
112        let verbose_scripts = config.trace_scripts || env_verbose;
113        let mut env_vars = collect_env_vars(&config.env_includes)?;
114        let mut shared_env: HashMap<String, String> = env::vars().collect();
115        expand_env_list(&mut env_vars[..], &shared_env);
116        shared_env.extend(env_vars.iter().cloned());
117        let stage_specs: Vec<(String, usize)> = pipeline
118            .stages
119            .iter()
120            .map(|stage| (stage.name.clone(), stage.jobs.len()))
121            .collect();
122        let stage_tracker = stage_tracker::StageTracker::new(&stage_specs);
123
124        let secrets = SecretsStore::load(&config.workdir)?;
125        shared_env.extend(secrets.env_pairs());
126        let artifacts = ArtifactManager::new(session_dir.clone());
127        let cache_root = runtime::cache_root();
128        fs::create_dir_all(&cache_root)
129            .with_context(|| format!("failed to create cache root {:?}", cache_root))?;
130        let cache = CacheManager::new(cache_root);
131        let external_artifacts = config.gitlab.as_ref().map(|cfg| {
132            ExternalArtifactsManager::new(
133                session_dir.clone(),
134                cfg.base_url.clone(),
135                cfg.token.clone(),
136            )
137        });
138        let project_dir = config
139            .workdir
140            .file_name()
141            .and_then(|n| n.to_str())
142            .unwrap_or("project");
143        let container_workdir = Path::new(CONTAINER_ROOT).join(project_dir);
144        let container_session_dir = Path::new("/opal").join(&run_id);
145
146        let core = Self {
147            config,
148            pipeline,
149            use_color,
150            scripts_dir,
151            logs_dir,
152            session_dir,
153            container_session_dir,
154            run_id,
155            verbose_scripts,
156            env_vars,
157            shared_env,
158            container_workdir,
159            stage_tracker,
160            runtime_state: runtime_state::RuntimeState::default(),
161            history_store,
162            secrets,
163            artifacts,
164            cache,
165            external_artifacts,
166        };
167
168        registry::ensure_registry_logins(&core)?;
169
170        Ok(core)
171    }
172
173    pub async fn run(&self) -> Result<()> {
174        let plan = Arc::new(self.plan_jobs()?);
175        let resource_map = self.collect_job_resources(&plan);
176        let display = self.display();
177        let plan_text = collect_pipeline_plan(&display, &plan).join("\n");
178        let ui_resources = Self::convert_ui_resources(&resource_map);
179        let history_snapshot = self.history_store.snapshot();
180        let ui_handle = if self.config.enable_tui {
181            let variant_sources: HashMap<String, String> = plan
182                .variants
183                .iter()
184                .flat_map(|(source, variants)| {
185                    variants
186                        .iter()
187                        .map(move |variant| (variant.name.clone(), source.clone()))
188                })
189                .collect();
190            let jobs: Vec<UiJobInfo> = plan
191                .ordered
192                .iter()
193                .filter_map(|name| plan.nodes.get(name))
194                .map(|planned| UiJobInfo {
195                    name: planned.instance.job.name.clone(),
196                    source_name: variant_sources
197                        .get(&planned.instance.job.name)
198                        .cloned()
199                        .unwrap_or_else(|| planned.instance.job.name.clone()),
200                    stage: planned.instance.stage_name.clone(),
201                    log_path: planned.log_path.clone(),
202                    log_hash: planned.log_hash.clone(),
203                    runner: self.ui_runner_info_for_job(&planned.instance.job),
204                })
205                .collect();
206            Some(UiHandle::start(
207                jobs,
208                history_snapshot,
209                self.run_id.clone(),
210                ui_resources,
211                plan_text,
212                self.config.workdir.clone(),
213                self.config.pipeline.clone(),
214            )?)
215        } else {
216            None
217        };
218        let mut owned_command_rx = if !self.config.enable_tui {
219            let (tx, rx) = mpsc::unbounded_channel();
220            if let Ok(raw) = env::var("OPAL_ABORT_AFTER_SECS")
221                && let Ok(seconds) = raw.parse::<u64>()
222                && seconds > 0
223            {
224                tokio::spawn(async move {
225                    tokio::time::sleep(std::time::Duration::from_secs(seconds)).await;
226                    let _ = tx.send(crate::ui::UiCommand::AbortPipeline);
227                });
228            }
229            Some(rx)
230        } else {
231            None
232        };
233        let mut ui_command_rx = ui_handle
234            .as_ref()
235            .and_then(|handle| handle.command_receiver());
236        let command_rx = ui_command_rx.as_mut().or(owned_command_rx.as_mut());
237        let ui_bridge = ui_handle.as_ref().map(|handle| Arc::new(handle.bridge()));
238
239        let (mut summaries, result) =
240            orchestrator::execute_plan(self, plan.clone(), ui_bridge.clone(), command_rx).await;
241
242        if let Some(handle) = &ui_handle {
243            handle.pipeline_finished();
244        }
245
246        if let Some(commands) = ui_command_rx.as_mut() {
247            orchestrator::handle_restart_commands(
248                self,
249                plan.clone(),
250                ui_bridge.clone(),
251                commands,
252                &mut summaries,
253            )
254            .await?;
255        }
256
257        let history_entry = self.record_pipeline_history(&summaries, &resource_map);
258        if let (Some(entry), Some(ui)) = (history_entry, ui_bridge.as_deref()) {
259            ui.history_updated(entry);
260        }
261
262        if let Some(handle) = ui_handle {
263            handle.wait_for_exit();
264        }
265
266        if !self.config.enable_tui {
267            print_pipeline_summary(
268                &display,
269                &plan,
270                &summaries,
271                &self.session_dir,
272                display::print_line,
273            );
274        }
275        result
276    }
277
278    fn plan_jobs(&self) -> Result<ExecutionPlan> {
279        let run_manual = env::var("OPAL_RUN_MANUAL").is_ok_and(|v| v == "1");
280        let ctx = RuleContext::from_env(&self.config.workdir, self.shared_env.clone(), run_manual);
281        ctx.ensure_valid_tag_context()?;
282        if !pipeline::rules::filters_allow(&self.pipeline.filters, &ctx) {
283            return Ok(empty_execution_plan());
284        }
285        if let Some(workflow) = &self.pipeline.workflow
286            && !pipeline::rules::evaluate_workflow(&workflow.rules, &ctx)?
287        {
288            return Ok(empty_execution_plan());
289        }
290        let compiled = compile_pipeline(&self.pipeline, Some(&ctx))?;
291        let mut plan = build_execution_plan(compiled, |job| self.job_log_info(job))?;
292        if !self.config.selected_jobs.is_empty() {
293            plan = plan.select_jobs(&self.config.selected_jobs)?;
294        }
295        Ok(plan)
296    }
297
298    fn collect_job_resources(&self, plan: &ExecutionPlan) -> HashMap<String, JobResourceInfo> {
299        plan.nodes
300            .values()
301            .map(|planned| {
302                // TODO: 50 lines of code inside a map, when this explodes, what happens - refactor
303                let artifact_dir = if planned.instance.job.artifacts.paths.is_empty() {
304                    None
305                } else {
306                    Some(
307                        self.artifacts
308                            .job_artifacts_root(&planned.instance.job.name)
309                            .display()
310                            .to_string(),
311                    )
312                };
313                let artifact_paths = planned
314                    .instance
315                    .job
316                    .artifacts
317                    .paths
318                    .iter()
319                    .map(|path| path.display().to_string())
320                    .collect();
321                let env_vars = self.job_env(&planned.instance.job);
322                let cache_env: HashMap<String, String> = env_vars.iter().cloned().collect();
323                let caches = self
324                    .cache
325                    .describe_entries(
326                        &planned.instance.job.cache,
327                        &self.config.workdir,
328                        &cache_env,
329                    )
330                    .into_iter()
331                    .map(|entry| HistoryCache {
332                        key: entry.key,
333                        policy: cache_policy_label(entry.policy).to_string(),
334                        host: entry.host.display().to_string(),
335                        paths: entry
336                            .paths
337                            .iter()
338                            .map(|path| path.display().to_string())
339                            .collect(),
340                    })
341                    .collect();
342                (
343                    planned.instance.job.name.clone(),
344                    JobResourceInfo {
345                        artifact_dir,
346                        artifact_paths,
347                        caches,
348                        container_name: None,
349                        service_network: None,
350                        service_containers: Vec::new(),
351                        runtime_summary_path: None,
352                        env_vars: self.visible_job_env_vars(&planned.instance.job),
353                    },
354                )
355            })
356            .collect()
357    }
358
359    fn convert_ui_resources(
360        resources: &HashMap<String, JobResourceInfo>,
361    ) -> HashMap<String, UiJobResources> {
362        resources
363            .iter()
364            .map(|(name, info)| {
365                (
366                    name.clone(),
367                    UiJobResources {
368                        artifact_dir: info.artifact_dir.clone(),
369                        artifact_paths: info.artifact_paths.clone(),
370                        caches: info.caches.clone(),
371                        container_name: info.container_name.clone(),
372                        service_network: info.service_network.clone(),
373                        service_containers: info.service_containers.clone(),
374                        runtime_summary_path: info.runtime_summary_path.clone(),
375                        env_vars: info.env_vars.clone(),
376                    },
377                )
378            })
379            .collect()
380    }
381
382    fn record_pipeline_history(
383        &self,
384        summaries: &[JobSummary],
385        resources: &HashMap<String, JobResourceInfo>,
386    ) -> Option<HistoryEntry> {
387        let resource_map = resources
388            .iter()
389            .map(|(name, info)| {
390                let runtime = self.runtime_state.runtime_objects(name);
391                (
392                    name.clone(),
393                    history_store::HistoryResources {
394                        artifact_dir: info.artifact_dir.clone(),
395                        artifacts: info.artifact_paths.clone(),
396                        caches: info.caches.clone(),
397                        container_name: runtime
398                            .as_ref()
399                            .and_then(|objects| objects.container_name.clone())
400                            .or_else(|| info.container_name.clone()),
401                        service_network: runtime
402                            .as_ref()
403                            .and_then(|objects| objects.service_network.clone())
404                            .or_else(|| info.service_network.clone()),
405                        service_containers: runtime
406                            .as_ref()
407                            .map(|objects| objects.service_containers.clone())
408                            .unwrap_or_else(|| info.service_containers.clone()),
409                        runtime_summary_path: runtime
410                            .as_ref()
411                            .and_then(|objects| objects.runtime_summary_path.clone())
412                            .or_else(|| info.runtime_summary_path.clone()),
413                        env_vars: info.env_vars.clone(),
414                    },
415                )
416            })
417            .collect();
418        self.history_store
419            .record(&self.run_id, summaries, &resource_map)
420    }
421
422    pub(crate) fn record_runtime_objects(
423        &self,
424        job_name: &str,
425        container_name: String,
426        service_network: Option<String>,
427        service_containers: Vec<String>,
428        runtime_summary_path: Option<String>,
429    ) {
430        self.runtime_state.record_runtime_objects(
431            job_name,
432            container_name,
433            service_network,
434            service_containers,
435            runtime_summary_path,
436        );
437    }
438
439    pub(crate) fn write_runtime_summary(
440        &self,
441        job_name: &str,
442        container_name: &str,
443        service_network: Option<&str>,
444        service_containers: &[String],
445    ) -> Result<Option<String>> {
446        runtime_summary::write_runtime_summary(
447            self,
448            job_name,
449            container_name,
450            service_network,
451            service_containers,
452        )
453    }
454
455    pub(crate) fn log_job_start(
456        &self,
457        planned: &ExecutableJob,
458        ui: Option<&UiBridge>,
459    ) -> Result<JobRunInfo> {
460        launch::log_job_start(self, planned, ui)
461    }
462
463    pub(crate) fn prepare_job_run(
464        &self,
465        plan: &ExecutionPlan,
466        job: &JobSpec,
467    ) -> Result<preparer::PreparedJobRun> {
468        preparer::prepare_job_run(self, plan, job)
469    }
470
471    pub(crate) fn collect_untracked_artifacts(
472        &self,
473        job: &JobSpec,
474        workspace: &Path,
475    ) -> Result<()> {
476        self.artifacts.collect_untracked(job, workspace)
477    }
478
479    pub(crate) fn collect_declared_artifacts(
480        &self,
481        job: &JobSpec,
482        workspace: &Path,
483        mounts: &[crate::pipeline::VolumeMount],
484    ) -> Result<()> {
485        self.artifacts
486            .collect_declared(job, workspace, mounts, &self.container_workdir)
487    }
488
489    pub(crate) fn collect_dotenv_artifacts(
490        &self,
491        job: &JobSpec,
492        workspace: &Path,
493        mounts: &[crate::pipeline::VolumeMount],
494    ) -> Result<()> {
495        self.artifacts
496            .collect_dotenv_report(job, workspace, mounts, &self.container_workdir)
497    }
498
499    pub(crate) fn clear_running_container(&self, job_name: &str) {
500        self.runtime_state.clear_running_container(job_name);
501    }
502
503    pub(crate) fn record_completed_job(&self, job_name: &str, outcome: ArtifactSourceOutcome) {
504        self.runtime_state.record_completed_job(job_name, outcome);
505    }
506
507    pub(crate) fn completed_jobs(&self) -> HashMap<String, ArtifactSourceOutcome> {
508        self.runtime_state.completed_jobs()
509    }
510
511    pub(crate) fn take_cancelled_job(&self, job_name: &str) -> bool {
512        self.runtime_state.take_cancelled_job(job_name)
513    }
514
515    pub(crate) fn cancel_running_job(&self, job_name: &str) -> bool {
516        let container = self.runtime_state.running_container(job_name);
517        if let Some(container_name) = container {
518            self.runtime_state.mark_job_cancelled(job_name);
519            self.kill_container(job_name, &container_name);
520            true
521        } else {
522            false
523        }
524    }
525
526    pub(crate) fn execute(&self, ctx: ExecuteContext<'_>) -> Result<()> {
527        process::execute(self, ctx)
528    }
529
530    pub(crate) fn print_job_completion(
531        &self,
532        stage_name: &str,
533        script_path: &Path,
534        log_path: &Path,
535        elapsed: f32,
536    ) {
537        if !self.config.enable_tui {
538            let display = self.display();
539            display::print_line(format!("    script stored at {}", script_path.display()));
540            display::print_line(format!("    log file stored at {}", log_path.display()));
541            let finish_label = display.bold_green("    ✓ finished in");
542            display::print_line(format!("{} {:.2}s", finish_label, elapsed));
543
544            if let Some(stage_elapsed) = self.stage_tracker.complete_job(stage_name) {
545                let stage_footer = display.bold_blue("╰─ stage complete in");
546                display::print_line(format!("{stage_footer} {:.2}s", stage_elapsed));
547            }
548        }
549    }
550
551    pub(crate) fn kill_container(&self, job_name: &str, container_name: &str) {
552        lifecycle::kill_container(self, job_name, container_name);
553    }
554
555    pub(crate) fn cleanup_finished_container(&self, container_name: &str) {
556        lifecycle::cleanup_finished_container(self, container_name);
557    }
558
559    fn resolve_job_image_with_env(
560        &self,
561        job: &JobSpec,
562        env_lookup: Option<&HashMap<String, String>>,
563    ) -> Result<crate::model::ImageSpec> {
564        launch::resolve_job_image_with_env(self, job, env_lookup)
565    }
566
567    fn job_env(&self, job: &JobSpec) -> Vec<(String, String)> {
568        build_job_env(
569            &self.env_vars,
570            &self.pipeline.defaults.variables,
571            job,
572            &self.secrets,
573            &self.config.workdir,
574            &self.container_workdir,
575            Path::new(CONTAINER_ROOT),
576            &self.run_id,
577            &self.shared_env,
578        )
579    }
580
581    fn visible_job_env_vars(&self, job: &JobSpec) -> Vec<String> {
582        let mut vars = BTreeSet::new();
583        for (key, _) in self.job_env(job) {
584            if is_user_visible_job_env(&key) {
585                vars.insert(key);
586            }
587        }
588        vars.into_iter().collect()
589    }
590
591    pub(crate) fn expanded_environment(
592        &self,
593        job: &JobSpec,
594    ) -> Option<crate::model::EnvironmentSpec> {
595        let environment = job.environment.as_ref()?;
596        let lookup: HashMap<String, String> = self.job_env(job).into_iter().collect();
597        Some(crate::env::expand_environment(environment, &lookup))
598    }
599
600    fn display(&self) -> DisplayFormatter {
601        DisplayFormatter::new(self.use_color)
602    }
603
604    pub(crate) fn analyze_job_with_default_provider(
605        &self,
606        plan: &ExecutionPlan,
607        job_name: &str,
608        source_name: &str,
609        ui: Option<&UiBridge>,
610    ) {
611        let provider = self
612            .config
613            .settings
614            .ai_settings()
615            .default_provider
616            .unwrap_or(crate::config::AiProviderConfig::Ollama);
617        let provider_kind = match provider {
618            crate::config::AiProviderConfig::Ollama => AiProviderKind::Ollama,
619            crate::config::AiProviderConfig::Claude => AiProviderKind::Claude,
620            crate::config::AiProviderConfig::Codex => AiProviderKind::Codex,
621        };
622        let provider_label = match provider_kind {
623            AiProviderKind::Ollama => "ollama",
624            AiProviderKind::Claude => "claude",
625            AiProviderKind::Codex => "codex",
626        };
627        if let Some(ui) = ui {
628            ui.analysis_started(job_name, provider_label);
629        }
630
631        let outcome = (|| -> Result<Option<PathBuf>> {
632            if provider_kind == AiProviderKind::Ollama
633                && self
634                    .config
635                    .settings
636                    .ai_settings()
637                    .ollama
638                    .model
639                    .trim()
640                    .is_empty()
641            {
642                anyhow::bail!(
643                    "Ollama analysis requires [ai.ollama].model in config; Opal does not choose a default model for you"
644                );
645            }
646            let rendered = self.render_ai_prompt_parts(plan, job_name, source_name)?;
647            let prompt = self.secrets.mask_fragment(&rendered.prompt);
648            let system = rendered
649                .system
650                .as_deref()
651                .map(|text| self.secrets.mask_fragment(text).into_owned());
652
653            let save_path = self.config.settings.ai_settings().save_analysis.then(|| {
654                self.session_dir
655                    .join(job_name_slug(job_name))
656                    .join("analysis")
657                    .join(format!("{provider_label}.md"))
658            });
659
660            let request = AiRequest {
661                provider: provider_kind,
662                prompt: prompt.into_owned(),
663                system,
664                host: (provider_kind == AiProviderKind::Ollama)
665                    .then(|| self.config.settings.ai_settings().ollama.host.clone()),
666                model: match provider_kind {
667                    AiProviderKind::Ollama => {
668                        Some(self.config.settings.ai_settings().ollama.model.clone())
669                    }
670                    AiProviderKind::Codex => self.config.settings.ai_settings().codex.model.clone(),
671                    AiProviderKind::Claude => None,
672                },
673                command: (provider_kind == AiProviderKind::Codex)
674                    .then(|| self.config.settings.ai_settings().codex.command.clone()),
675                args: Vec::new(),
676                workdir: (provider_kind == AiProviderKind::Codex)
677                    .then(|| self.config.workdir.clone()),
678                save_path: save_path.clone(),
679            };
680
681            let result = ai::analyze_with_default_provider(&request, |chunk| {
682                if let (Some(ui), ai::AiChunk::Text(text)) = (ui, chunk) {
683                    ui.analysis_chunk(job_name, &text);
684                }
685            })
686            .map_err(|err| anyhow::anyhow!(err.message))?;
687
688            if let Some(path) = &save_path {
689                if let Some(parent) = path.parent() {
690                    fs::create_dir_all(parent)?;
691                }
692                fs::write(path, result.text)?;
693            }
694
695            Ok(save_path)
696        })();
697
698        if let Some(ui) = ui {
699            match outcome {
700                Ok(saved_path) => ui.analysis_finished(
701                    job_name,
702                    if let Some(path) = &saved_path {
703                        fs::read_to_string(path).unwrap_or_default()
704                    } else {
705                        String::new()
706                    },
707                    saved_path,
708                    None,
709                ),
710                Err(err) => {
711                    ui.analysis_finished(job_name, String::new(), None, Some(err.to_string()))
712                }
713            }
714        }
715    }
716
717    pub(crate) fn render_ai_prompt(
718        &self,
719        plan: &ExecutionPlan,
720        job_name: &str,
721        source_name: &str,
722    ) -> Result<String> {
723        let rendered = self.render_ai_prompt_parts(plan, job_name, source_name)?;
724        let mut text = String::new();
725        if let Some(system) = rendered.system {
726            text.push_str("# System\n\n");
727            text.push_str(system.trim());
728            text.push_str("\n\n");
729        }
730        text.push_str("# Prompt\n\n");
731        text.push_str(rendered.prompt.trim());
732        text.push('\n');
733        Ok(text)
734    }
735
736    fn render_ai_prompt_parts(
737        &self,
738        plan: &ExecutionPlan,
739        job_name: &str,
740        source_name: &str,
741    ) -> Result<crate::ai::RenderedPrompt> {
742        let context = self.build_ai_context(plan, job_name, source_name)?;
743        render_job_analysis_prompt(
744            &self.config.workdir,
745            self.config.settings.ai_settings(),
746            &context,
747        )
748    }
749
750    fn build_ai_context(
751        &self,
752        plan: &ExecutionPlan,
753        job_name: &str,
754        source_name: &str,
755    ) -> Result<AiContext> {
756        let planned = plan
757            .nodes
758            .get(job_name)
759            .with_context(|| format!("selected job '{job_name}' not found in execution plan"))?;
760        let runner = self.ui_runner_info_for_job(&planned.instance.job);
761        let runner_summary = format!(
762            "engine={} arch={} vcpu={} ram={}",
763            runner.engine,
764            runner.arch.unwrap_or_else(|| "native/default".to_string()),
765            runner.cpus.unwrap_or_else(|| "engine default".to_string()),
766            runner
767                .memory
768                .unwrap_or_else(|| "engine default".to_string())
769        );
770        let job_yaml = self.load_job_yaml_fragment(source_name)?;
771        let pipeline_summary = format!(
772            "dependencies: {}\nneeds: {}\nallow_failure: {}\ninterruptible: {}",
773            if planned.instance.dependencies.is_empty() {
774                "none".to_string()
775            } else {
776                planned.instance.dependencies.join(", ")
777            },
778            if planned.instance.job.needs.is_empty() {
779                "none".to_string()
780            } else {
781                planned
782                    .instance
783                    .job
784                    .needs
785                    .iter()
786                    .map(|need| {
787                        if need.needs_artifacts {
788                            format!("{} (artifacts)", need.job)
789                        } else {
790                            need.job.clone()
791                        }
792                    })
793                    .collect::<Vec<_>>()
794                    .join(", ")
795            },
796            planned.instance.rule.allow_failure,
797            planned.instance.interruptible,
798        );
799        let runtime_summary = self
800            .runtime_state
801            .runtime_objects(job_name)
802            .and_then(|objects| objects.runtime_summary_path)
803            .and_then(|path| fs::read_to_string(path).ok());
804        let log_excerpt = self.read_job_log_excerpt(&planned.log_path)?;
805
806        Ok(AiContext {
807            job_name: job_name.to_string(),
808            source_name: source_name.to_string(),
809            stage: planned.instance.stage_name.clone(),
810            job_yaml,
811            runner_summary,
812            pipeline_summary,
813            runtime_summary,
814            log_excerpt,
815            failure_hint: None,
816        })
817    }
818
819    fn load_job_yaml_fragment(&self, source_name: &str) -> Result<String> {
820        let content = fs::read_to_string(&self.config.pipeline)
821            .with_context(|| format!("failed to read {}", self.config.pipeline.display()))?;
822        let yaml: serde_yaml::Value = serde_yaml::from_str(&content)
823            .with_context(|| format!("failed to parse {}", self.config.pipeline.display()))?;
824        let Some(mapping) = yaml.as_mapping() else {
825            return Ok(format!("# job '{source_name}' not found"));
826        };
827        for (key, value) in mapping {
828            if key.as_str() == Some(source_name) {
829                let mut root = serde_yaml::Mapping::new();
830                root.insert(
831                    serde_yaml::Value::String(source_name.to_string()),
832                    value.clone(),
833                );
834                return Ok(serde_yaml::to_string(&serde_yaml::Value::Mapping(root))?);
835            }
836        }
837        Ok(format!("# job '{source_name}' not found"))
838    }
839
840    fn read_job_log_excerpt(&self, path: &Path) -> Result<String> {
841        let content = fs::read_to_string(path)
842            .with_context(|| format!("failed to read log {}", path.display()))?;
843        let tail_lines = self.config.settings.ai_settings().tail_lines.max(50);
844        let lines: Vec<&str> = content.lines().collect();
845        let start = lines.len().saturating_sub(tail_lines);
846        Ok(lines[start..].join("\n"))
847    }
848
849    fn ui_runner_info_for_job(&self, job: &JobSpec) -> crate::ui::types::UiRunnerInfo {
850        let engine = match self.config.engine {
851            EngineKind::ContainerCli => "container",
852            EngineKind::Docker => "docker",
853            EngineKind::Podman => "podman",
854            EngineKind::Nerdctl => "nerdctl",
855            EngineKind::Orbstack => "orbstack",
856        }
857        .to_string();
858
859        let job_override = self.config.settings.job_override_for(&job.name);
860        let image_platform = job
861            .image
862            .as_ref()
863            .and_then(|image| image.docker_platform.as_deref());
864        let arch = match self.config.engine {
865            EngineKind::ContainerCli => job_override
866                .as_ref()
867                .and_then(|cfg| cfg.arch.clone())
868                .or_else(|| {
869                    self.config
870                        .settings
871                        .container_settings()
872                        .and_then(|cfg| cfg.arch.clone())
873                })
874                .or_else(|| std::env::var("OPAL_CONTAINER_ARCH").ok())
875                .or_else(|| image_platform.and_then(container_arch_from_platform))
876                .or_else(|| normalize_container_arch(std::env::consts::ARCH)),
877            _ => image_platform
878                .and_then(container_arch_from_platform)
879                .or_else(|| job_override.as_ref().and_then(|cfg| cfg.arch.clone()))
880                .or_else(|| normalize_container_arch(std::env::consts::ARCH)),
881        };
882
883        let (cpus, memory) = match self.config.engine {
884            EngineKind::ContainerCli => {
885                let settings = self.config.settings.container_settings();
886                (
887                    Some(
888                        settings
889                            .and_then(|cfg| cfg.cpus.clone())
890                            .unwrap_or_else(|| "4".to_string()),
891                    ),
892                    Some(
893                        settings
894                            .and_then(|cfg| cfg.memory.clone())
895                            .unwrap_or_else(|| "1638m".to_string()),
896                    ),
897                )
898            }
899            _ => (None, None),
900        };
901
902        crate::ui::types::UiRunnerInfo {
903            engine,
904            arch,
905            cpus,
906            memory,
907        }
908    }
909
910    fn job_log_info(&self, job: &JobSpec) -> (PathBuf, String) {
911        logging::job_log_info(&self.logs_dir, &self.run_id, job)
912    }
913
914    fn container_path_rel(&self, host_path: &Path) -> Result<PathBuf> {
915        paths::to_container_path(
916            host_path,
917            &[
918                (&*self.session_dir, &*self.container_session_dir),
919                (&*self.config.workdir, &*self.container_workdir),
920            ],
921        )
922    }
923}
924
925fn is_user_visible_job_env(key: &str) -> bool {
926    !(key == "CI"
927        || key == "PATH"
928        || key == "PWD"
929        || key == "SHLVL"
930        || key == "GITLAB_CI"
931        || key == "OPAL_IN_OPAL"
932        || key == "_"
933        || key.starts_with("CI_")
934        || key.starts_with("GITLAB_")
935        || key.starts_with("OPAL_"))
936}
937
938fn empty_execution_plan() -> ExecutionPlan {
939    ExecutionPlan {
940        ordered: Vec::new(),
941        nodes: HashMap::new(),
942        dependents: HashMap::new(),
943        order_index: HashMap::new(),
944        variants: HashMap::new(),
945    }
946}
947
948fn cache_policy_label(policy: CachePolicySpec) -> &'static str {
949    match policy {
950        CachePolicySpec::Pull => "pull",
951        CachePolicySpec::Push => "push",
952        CachePolicySpec::PullPush => "pull-push",
953    }
954}
955
956#[cfg(test)]
957mod tests {
958    // ExecutorCore-specific unit coverage lives in child modules while phase 3 extraction continues.
959}