Skip to main content

opal/executor/
core.rs

1mod history_store;
2mod launch;
3mod lifecycle;
4mod preparer;
5mod process;
6mod registry;
7mod runtime_state;
8mod runtime_summary;
9mod stage_tracker;
10mod workspace;
11
12use super::{orchestrator, paths};
13use crate::ai::{self, AiContext, AiProviderKind, AiRequest, render_job_analysis_prompt};
14use crate::compiler::compile_pipeline;
15use crate::display::{self, DisplayFormatter, collect_pipeline_plan, print_pipeline_summary};
16use crate::env::{build_job_env, collect_env_vars, expand_env_list};
17use crate::execution_plan::{ExecutableJob, ExecutionPlan, build_execution_plan};
18use crate::executor::container_arch::{container_arch_from_platform, normalize_container_arch};
19use crate::history::{HistoryCache, HistoryEntry};
20use crate::logging;
21use crate::model::{ArtifactSourceOutcome, CachePolicySpec, JobSpec, PipelineSpec};
22use crate::naming::{generate_run_id, job_name_slug};
23use crate::pipeline::{
24    self, ArtifactManager, CacheManager, ExternalArtifactsManager, JobRunInfo, JobSummary,
25    RuleContext,
26};
27use crate::runner::ExecuteContext;
28use crate::secrets::SecretsStore;
29use crate::terminal::should_use_color;
30use crate::ui::{UiBridge, UiHandle, UiJobInfo, UiJobResources};
31use crate::{EngineKind, ExecutorConfig, runtime};
32use anyhow::{Context, Result};
33use std::collections::HashMap;
34use std::env;
35use std::fs;
36use std::path::{Path, PathBuf};
37use std::sync::Arc;
38use tokio::sync::mpsc;
39
40pub(super) const CONTAINER_ROOT: &str = "/builds";
41
42#[derive(Debug, Clone)]
43pub struct ExecutorCore {
44    pub config: ExecutorConfig,
45    pipeline: PipelineSpec,
46    use_color: bool,
47    scripts_dir: PathBuf,
48    logs_dir: PathBuf,
49    session_dir: PathBuf,
50    container_session_dir: PathBuf,
51    run_id: String,
52    verbose_scripts: bool,
53    env_vars: Vec<(String, String)>,
54    shared_env: HashMap<String, String>,
55    container_workdir: PathBuf,
56    stage_tracker: stage_tracker::StageTracker,
57    runtime_state: runtime_state::RuntimeState,
58    history_store: history_store::HistoryStore,
59    secrets: SecretsStore,
60    artifacts: ArtifactManager,
61    cache: CacheManager,
62    external_artifacts: Option<ExternalArtifactsManager>,
63}
64
65#[derive(Debug, Clone)]
66struct JobResourceInfo {
67    artifact_dir: Option<String>,
68    artifact_paths: Vec<String>,
69    caches: Vec<HistoryCache>,
70    container_name: Option<String>,
71    service_network: Option<String>,
72    service_containers: Vec<String>,
73    runtime_summary_path: Option<String>,
74}
75
76impl ExecutorCore {
77    // TODO: this shit does way too much, hard to test if you add fs::create inside of it
78    pub fn new(config: ExecutorConfig) -> Result<Self> {
79        let pipeline =
80            PipelineSpec::from_path_with_gitlab(&config.pipeline, config.gitlab.as_ref())?;
81        let run_id = generate_run_id(&config);
82        let runs_root = runtime::runs_root();
83        fs::create_dir_all(&runs_root)
84            .with_context(|| format!("failed to create {:?}", runs_root))?;
85
86        let session_dir = runtime::session_dir(&run_id);
87        if session_dir.exists() {
88            fs::remove_dir_all(&session_dir)
89                .with_context(|| format!("failed to clean {:?}", session_dir))?;
90        }
91        fs::create_dir_all(&session_dir)
92            .with_context(|| format!("failed to create {:?}", session_dir))?;
93
94        let scripts_dir = session_dir.join("scripts");
95        fs::create_dir_all(&scripts_dir)
96            .with_context(|| format!("failed to create {:?}", scripts_dir))?;
97
98        let logs_dir = runtime::logs_dir(&run_id);
99        fs::create_dir_all(&logs_dir)
100            .with_context(|| format!("failed to create {:?}", logs_dir))?;
101
102        let history_store = history_store::HistoryStore::load(runtime::history_path());
103
104        let use_color = should_use_color();
105        let env_verbose = env::var_os("OPAL_DEBUG")
106            .map(|val| {
107                let s = val.to_string_lossy();
108                s == "1" || s.eq_ignore_ascii_case("true")
109            })
110            .unwrap_or(false);
111        let verbose_scripts = config.trace_scripts || env_verbose;
112        let mut env_vars = collect_env_vars(&config.env_includes)?;
113        let mut shared_env: HashMap<String, String> = env::vars().collect();
114        expand_env_list(&mut env_vars[..], &shared_env);
115        shared_env.extend(env_vars.iter().cloned());
116        let stage_specs: Vec<(String, usize)> = pipeline
117            .stages
118            .iter()
119            .map(|stage| (stage.name.clone(), stage.jobs.len()))
120            .collect();
121        let stage_tracker = stage_tracker::StageTracker::new(&stage_specs);
122
123        let secrets = SecretsStore::load(&config.workdir)?;
124        shared_env.extend(secrets.env_pairs());
125        let artifacts = ArtifactManager::new(session_dir.clone());
126        let cache_root = runtime::cache_root();
127        fs::create_dir_all(&cache_root)
128            .with_context(|| format!("failed to create cache root {:?}", cache_root))?;
129        let cache = CacheManager::new(cache_root);
130        let external_artifacts = config.gitlab.as_ref().map(|cfg| {
131            ExternalArtifactsManager::new(
132                session_dir.clone(),
133                cfg.base_url.clone(),
134                cfg.token.clone(),
135            )
136        });
137        let project_dir = config
138            .workdir
139            .file_name()
140            .and_then(|n| n.to_str())
141            .unwrap_or("project");
142        let container_workdir = Path::new(CONTAINER_ROOT).join(project_dir);
143        let container_session_dir = Path::new("/opal").join(&run_id);
144
145        let core = Self {
146            config,
147            pipeline,
148            use_color,
149            scripts_dir,
150            logs_dir,
151            session_dir,
152            container_session_dir,
153            run_id,
154            verbose_scripts,
155            env_vars,
156            shared_env,
157            container_workdir,
158            stage_tracker,
159            runtime_state: runtime_state::RuntimeState::default(),
160            history_store,
161            secrets,
162            artifacts,
163            cache,
164            external_artifacts,
165        };
166
167        registry::ensure_registry_logins(&core)?;
168
169        Ok(core)
170    }
171
172    pub async fn run(&self) -> Result<()> {
173        let plan = Arc::new(self.plan_jobs()?);
174        let resource_map = self.collect_job_resources(&plan);
175        let display = self.display();
176        let plan_text = collect_pipeline_plan(&display, &plan).join("\n");
177        let ui_resources = Self::convert_ui_resources(&resource_map);
178        let history_snapshot = self.history_store.snapshot();
179        let ui_handle = if self.config.enable_tui {
180            let variant_sources: HashMap<String, String> = plan
181                .variants
182                .iter()
183                .flat_map(|(source, variants)| {
184                    variants
185                        .iter()
186                        .map(move |variant| (variant.name.clone(), source.clone()))
187                })
188                .collect();
189            let jobs: Vec<UiJobInfo> = plan
190                .ordered
191                .iter()
192                .filter_map(|name| plan.nodes.get(name))
193                .map(|planned| UiJobInfo {
194                    name: planned.instance.job.name.clone(),
195                    source_name: variant_sources
196                        .get(&planned.instance.job.name)
197                        .cloned()
198                        .unwrap_or_else(|| planned.instance.job.name.clone()),
199                    stage: planned.instance.stage_name.clone(),
200                    log_path: planned.log_path.clone(),
201                    log_hash: planned.log_hash.clone(),
202                    runner: self.ui_runner_info_for_job(&planned.instance.job),
203                })
204                .collect();
205            Some(UiHandle::start(
206                jobs,
207                history_snapshot,
208                self.run_id.clone(),
209                ui_resources,
210                plan_text,
211                self.config.workdir.clone(),
212                self.config.pipeline.clone(),
213            )?)
214        } else {
215            None
216        };
217        let mut owned_command_rx = if !self.config.enable_tui {
218            let (tx, rx) = mpsc::unbounded_channel();
219            if let Ok(raw) = env::var("OPAL_ABORT_AFTER_SECS")
220                && let Ok(seconds) = raw.parse::<u64>()
221                && seconds > 0
222            {
223                tokio::spawn(async move {
224                    tokio::time::sleep(std::time::Duration::from_secs(seconds)).await;
225                    let _ = tx.send(crate::ui::UiCommand::AbortPipeline);
226                });
227            }
228            Some(rx)
229        } else {
230            None
231        };
232        let mut ui_command_rx = ui_handle
233            .as_ref()
234            .and_then(|handle| handle.command_receiver());
235        let command_rx = ui_command_rx.as_mut().or(owned_command_rx.as_mut());
236        let ui_bridge = ui_handle.as_ref().map(|handle| Arc::new(handle.bridge()));
237
238        let (mut summaries, result) =
239            orchestrator::execute_plan(self, plan.clone(), ui_bridge.clone(), command_rx).await;
240
241        if let Some(handle) = &ui_handle {
242            handle.pipeline_finished();
243        }
244
245        if let Some(commands) = ui_command_rx.as_mut() {
246            orchestrator::handle_restart_commands(
247                self,
248                plan.clone(),
249                ui_bridge.clone(),
250                commands,
251                &mut summaries,
252            )
253            .await?;
254        }
255
256        let history_entry = self.record_pipeline_history(&summaries, &resource_map);
257        if let (Some(entry), Some(ui)) = (history_entry, ui_bridge.as_deref()) {
258            ui.history_updated(entry);
259        }
260
261        if let Some(handle) = ui_handle {
262            handle.wait_for_exit();
263        }
264
265        if !self.config.enable_tui {
266            print_pipeline_summary(
267                &display,
268                &plan,
269                &summaries,
270                &self.session_dir,
271                display::print_line,
272            );
273        }
274        result
275    }
276
277    fn plan_jobs(&self) -> Result<ExecutionPlan> {
278        let run_manual = env::var("OPAL_RUN_MANUAL").is_ok_and(|v| v == "1");
279        let ctx = RuleContext::from_env(&self.config.workdir, self.shared_env.clone(), run_manual);
280        ctx.ensure_valid_tag_context()?;
281        if !pipeline::rules::filters_allow(&self.pipeline.filters, &ctx) {
282            return Ok(empty_execution_plan());
283        }
284        if let Some(workflow) = &self.pipeline.workflow
285            && !pipeline::rules::evaluate_workflow(&workflow.rules, &ctx)?
286        {
287            return Ok(empty_execution_plan());
288        }
289        let compiled = compile_pipeline(&self.pipeline, Some(&ctx))?;
290        let mut plan = build_execution_plan(compiled, |job| self.job_log_info(job))?;
291        if !self.config.selected_jobs.is_empty() {
292            plan = plan.select_jobs(&self.config.selected_jobs)?;
293        }
294        Ok(plan)
295    }
296
297    fn collect_job_resources(&self, plan: &ExecutionPlan) -> HashMap<String, JobResourceInfo> {
298        plan.nodes
299            .values()
300            .map(|planned| {
301                // TODO: 50 lines of code inside a map, when this explodes, what happens - refactor
302                let artifact_dir = if planned.instance.job.artifacts.paths.is_empty() {
303                    None
304                } else {
305                    Some(
306                        self.artifacts
307                            .job_artifacts_root(&planned.instance.job.name)
308                            .display()
309                            .to_string(),
310                    )
311                };
312                let artifact_paths = planned
313                    .instance
314                    .job
315                    .artifacts
316                    .paths
317                    .iter()
318                    .map(|path| path.display().to_string())
319                    .collect();
320                let env_vars = self.job_env(&planned.instance.job);
321                let cache_env: HashMap<String, String> = env_vars.iter().cloned().collect();
322                let caches = self
323                    .cache
324                    .describe_entries(
325                        &planned.instance.job.cache,
326                        &self.config.workdir,
327                        &cache_env,
328                    )
329                    .into_iter()
330                    .map(|entry| HistoryCache {
331                        key: entry.key,
332                        policy: cache_policy_label(entry.policy).to_string(),
333                        host: entry.host.display().to_string(),
334                        paths: entry
335                            .paths
336                            .iter()
337                            .map(|path| path.display().to_string())
338                            .collect(),
339                    })
340                    .collect();
341                (
342                    planned.instance.job.name.clone(),
343                    JobResourceInfo {
344                        artifact_dir,
345                        artifact_paths,
346                        caches,
347                        container_name: None,
348                        service_network: None,
349                        service_containers: Vec::new(),
350                        runtime_summary_path: None,
351                    },
352                )
353            })
354            .collect()
355    }
356
357    fn convert_ui_resources(
358        resources: &HashMap<String, JobResourceInfo>,
359    ) -> HashMap<String, UiJobResources> {
360        resources
361            .iter()
362            .map(|(name, info)| {
363                (
364                    name.clone(),
365                    UiJobResources {
366                        artifact_dir: info.artifact_dir.clone(),
367                        artifact_paths: info.artifact_paths.clone(),
368                        caches: info.caches.clone(),
369                        container_name: info.container_name.clone(),
370                        service_network: info.service_network.clone(),
371                        service_containers: info.service_containers.clone(),
372                        runtime_summary_path: info.runtime_summary_path.clone(),
373                    },
374                )
375            })
376            .collect()
377    }
378
379    fn record_pipeline_history(
380        &self,
381        summaries: &[JobSummary],
382        resources: &HashMap<String, JobResourceInfo>,
383    ) -> Option<HistoryEntry> {
384        let resource_map = resources
385            .iter()
386            .map(|(name, info)| {
387                let runtime = self.runtime_state.runtime_objects(name);
388                (
389                    name.clone(),
390                    history_store::HistoryResources {
391                        artifact_dir: info.artifact_dir.clone(),
392                        artifacts: info.artifact_paths.clone(),
393                        caches: info.caches.clone(),
394                        container_name: runtime
395                            .as_ref()
396                            .and_then(|objects| objects.container_name.clone())
397                            .or_else(|| info.container_name.clone()),
398                        service_network: runtime
399                            .as_ref()
400                            .and_then(|objects| objects.service_network.clone())
401                            .or_else(|| info.service_network.clone()),
402                        service_containers: runtime
403                            .as_ref()
404                            .map(|objects| objects.service_containers.clone())
405                            .unwrap_or_else(|| info.service_containers.clone()),
406                        runtime_summary_path: runtime
407                            .as_ref()
408                            .and_then(|objects| objects.runtime_summary_path.clone())
409                            .or_else(|| info.runtime_summary_path.clone()),
410                    },
411                )
412            })
413            .collect();
414        self.history_store
415            .record(&self.run_id, summaries, &resource_map)
416    }
417
418    pub(crate) fn record_runtime_objects(
419        &self,
420        job_name: &str,
421        container_name: String,
422        service_network: Option<String>,
423        service_containers: Vec<String>,
424        runtime_summary_path: Option<String>,
425    ) {
426        self.runtime_state.record_runtime_objects(
427            job_name,
428            container_name,
429            service_network,
430            service_containers,
431            runtime_summary_path,
432        );
433    }
434
435    pub(crate) fn write_runtime_summary(
436        &self,
437        job_name: &str,
438        container_name: &str,
439        service_network: Option<&str>,
440        service_containers: &[String],
441    ) -> Result<Option<String>> {
442        runtime_summary::write_runtime_summary(
443            self,
444            job_name,
445            container_name,
446            service_network,
447            service_containers,
448        )
449    }
450
451    pub(crate) fn log_job_start(
452        &self,
453        planned: &ExecutableJob,
454        ui: Option<&UiBridge>,
455    ) -> Result<JobRunInfo> {
456        launch::log_job_start(self, planned, ui)
457    }
458
459    pub(crate) fn prepare_job_run(
460        &self,
461        plan: &ExecutionPlan,
462        job: &JobSpec,
463    ) -> Result<preparer::PreparedJobRun> {
464        preparer::prepare_job_run(self, plan, job)
465    }
466
467    pub(crate) fn collect_untracked_artifacts(
468        &self,
469        job: &JobSpec,
470        workspace: &Path,
471    ) -> Result<()> {
472        self.artifacts.collect_untracked(job, workspace)
473    }
474
475    pub(crate) fn collect_declared_artifacts(
476        &self,
477        job: &JobSpec,
478        workspace: &Path,
479        mounts: &[crate::pipeline::VolumeMount],
480    ) -> Result<()> {
481        self.artifacts
482            .collect_declared(job, workspace, mounts, &self.container_workdir)
483    }
484
485    pub(crate) fn collect_dotenv_artifacts(
486        &self,
487        job: &JobSpec,
488        workspace: &Path,
489        mounts: &[crate::pipeline::VolumeMount],
490    ) -> Result<()> {
491        self.artifacts
492            .collect_dotenv_report(job, workspace, mounts, &self.container_workdir)
493    }
494
495    pub(crate) fn clear_running_container(&self, job_name: &str) {
496        self.runtime_state.clear_running_container(job_name);
497    }
498
499    pub(crate) fn record_completed_job(&self, job_name: &str, outcome: ArtifactSourceOutcome) {
500        self.runtime_state.record_completed_job(job_name, outcome);
501    }
502
503    pub(crate) fn completed_jobs(&self) -> HashMap<String, ArtifactSourceOutcome> {
504        self.runtime_state.completed_jobs()
505    }
506
507    pub(crate) fn take_cancelled_job(&self, job_name: &str) -> bool {
508        self.runtime_state.take_cancelled_job(job_name)
509    }
510
511    pub(crate) fn cancel_running_job(&self, job_name: &str) -> bool {
512        let container = self.runtime_state.running_container(job_name);
513        if let Some(container_name) = container {
514            self.runtime_state.mark_job_cancelled(job_name);
515            self.kill_container(job_name, &container_name);
516            true
517        } else {
518            false
519        }
520    }
521
522    pub(crate) fn execute(&self, ctx: ExecuteContext<'_>) -> Result<()> {
523        process::execute(self, ctx)
524    }
525
526    pub(crate) fn print_job_completion(
527        &self,
528        stage_name: &str,
529        script_path: &Path,
530        log_path: &Path,
531        elapsed: f32,
532    ) {
533        if !self.config.enable_tui {
534            let display = self.display();
535            display::print_line(format!("    script stored at {}", script_path.display()));
536            display::print_line(format!("    log file stored at {}", log_path.display()));
537            let finish_label = display.bold_green("    ✓ finished in");
538            display::print_line(format!("{} {:.2}s", finish_label, elapsed));
539
540            if let Some(stage_elapsed) = self.stage_tracker.complete_job(stage_name) {
541                let stage_footer = display.bold_blue("╰─ stage complete in");
542                display::print_line(format!("{stage_footer} {:.2}s", stage_elapsed));
543            }
544        }
545    }
546
547    pub(crate) fn kill_container(&self, job_name: &str, container_name: &str) {
548        lifecycle::kill_container(self, job_name, container_name);
549    }
550
551    pub(crate) fn cleanup_finished_container(&self, container_name: &str) {
552        lifecycle::cleanup_finished_container(self, container_name);
553    }
554
555    fn resolve_job_image_with_env(
556        &self,
557        job: &JobSpec,
558        env_lookup: Option<&HashMap<String, String>>,
559    ) -> Result<crate::model::ImageSpec> {
560        launch::resolve_job_image_with_env(self, job, env_lookup)
561    }
562
563    fn job_env(&self, job: &JobSpec) -> Vec<(String, String)> {
564        build_job_env(
565            &self.env_vars,
566            &self.pipeline.defaults.variables,
567            job,
568            &self.secrets,
569            &self.config.workdir,
570            &self.container_workdir,
571            Path::new(CONTAINER_ROOT),
572            &self.run_id,
573            &self.shared_env,
574        )
575    }
576
577    pub(crate) fn expanded_environment(
578        &self,
579        job: &JobSpec,
580    ) -> Option<crate::model::EnvironmentSpec> {
581        let environment = job.environment.as_ref()?;
582        let lookup: HashMap<String, String> = self.job_env(job).into_iter().collect();
583        Some(crate::env::expand_environment(environment, &lookup))
584    }
585
586    fn display(&self) -> DisplayFormatter {
587        DisplayFormatter::new(self.use_color)
588    }
589
590    pub(crate) fn analyze_job_with_default_provider(
591        &self,
592        plan: &ExecutionPlan,
593        job_name: &str,
594        source_name: &str,
595        ui: Option<&UiBridge>,
596    ) {
597        let provider = self
598            .config
599            .settings
600            .ai_settings()
601            .default_provider
602            .unwrap_or(crate::config::AiProviderConfig::Ollama);
603        let provider_kind = match provider {
604            crate::config::AiProviderConfig::Ollama => AiProviderKind::Ollama,
605            crate::config::AiProviderConfig::Claude => AiProviderKind::Claude,
606            crate::config::AiProviderConfig::Codex => AiProviderKind::Codex,
607        };
608        let provider_label = match provider_kind {
609            AiProviderKind::Ollama => "ollama",
610            AiProviderKind::Claude => "claude",
611            AiProviderKind::Codex => "codex",
612        };
613        if let Some(ui) = ui {
614            ui.analysis_started(job_name, provider_label);
615        }
616
617        let outcome = (|| -> Result<Option<PathBuf>> {
618            if provider_kind == AiProviderKind::Ollama
619                && self
620                    .config
621                    .settings
622                    .ai_settings()
623                    .ollama
624                    .model
625                    .trim()
626                    .is_empty()
627            {
628                anyhow::bail!(
629                    "Ollama analysis requires [ai.ollama].model in config; Opal does not choose a default model for you"
630                );
631            }
632            let rendered = self.render_ai_prompt_parts(plan, job_name, source_name)?;
633            let prompt = self.secrets.mask_fragment(&rendered.prompt);
634            let system = rendered
635                .system
636                .as_deref()
637                .map(|text| self.secrets.mask_fragment(text).into_owned());
638
639            let save_path = self.config.settings.ai_settings().save_analysis.then(|| {
640                self.session_dir
641                    .join(job_name_slug(job_name))
642                    .join("analysis")
643                    .join(format!("{provider_label}.md"))
644            });
645
646            let request = AiRequest {
647                provider: provider_kind,
648                prompt: prompt.into_owned(),
649                system,
650                host: (provider_kind == AiProviderKind::Ollama)
651                    .then(|| self.config.settings.ai_settings().ollama.host.clone()),
652                model: match provider_kind {
653                    AiProviderKind::Ollama => {
654                        Some(self.config.settings.ai_settings().ollama.model.clone())
655                    }
656                    AiProviderKind::Codex => self.config.settings.ai_settings().codex.model.clone(),
657                    AiProviderKind::Claude => None,
658                },
659                command: (provider_kind == AiProviderKind::Codex)
660                    .then(|| self.config.settings.ai_settings().codex.command.clone()),
661                args: Vec::new(),
662                workdir: (provider_kind == AiProviderKind::Codex)
663                    .then(|| self.config.workdir.clone()),
664                save_path: save_path.clone(),
665            };
666
667            let result = ai::analyze_with_default_provider(&request, |chunk| {
668                if let (Some(ui), ai::AiChunk::Text(text)) = (ui, chunk) {
669                    ui.analysis_chunk(job_name, &text);
670                }
671            })
672            .map_err(|err| anyhow::anyhow!(err.message))?;
673
674            if let Some(path) = &save_path {
675                if let Some(parent) = path.parent() {
676                    fs::create_dir_all(parent)?;
677                }
678                fs::write(path, result.text)?;
679            }
680
681            Ok(save_path)
682        })();
683
684        if let Some(ui) = ui {
685            match outcome {
686                Ok(saved_path) => ui.analysis_finished(
687                    job_name,
688                    if let Some(path) = &saved_path {
689                        fs::read_to_string(path).unwrap_or_default()
690                    } else {
691                        String::new()
692                    },
693                    saved_path,
694                    None,
695                ),
696                Err(err) => {
697                    ui.analysis_finished(job_name, String::new(), None, Some(err.to_string()))
698                }
699            }
700        }
701    }
702
703    pub(crate) fn render_ai_prompt(
704        &self,
705        plan: &ExecutionPlan,
706        job_name: &str,
707        source_name: &str,
708    ) -> Result<String> {
709        let rendered = self.render_ai_prompt_parts(plan, job_name, source_name)?;
710        let mut text = String::new();
711        if let Some(system) = rendered.system {
712            text.push_str("# System\n\n");
713            text.push_str(system.trim());
714            text.push_str("\n\n");
715        }
716        text.push_str("# Prompt\n\n");
717        text.push_str(rendered.prompt.trim());
718        text.push('\n');
719        Ok(text)
720    }
721
722    fn render_ai_prompt_parts(
723        &self,
724        plan: &ExecutionPlan,
725        job_name: &str,
726        source_name: &str,
727    ) -> Result<crate::ai::RenderedPrompt> {
728        let context = self.build_ai_context(plan, job_name, source_name)?;
729        render_job_analysis_prompt(
730            &self.config.workdir,
731            self.config.settings.ai_settings(),
732            &context,
733        )
734    }
735
736    fn build_ai_context(
737        &self,
738        plan: &ExecutionPlan,
739        job_name: &str,
740        source_name: &str,
741    ) -> Result<AiContext> {
742        let planned = plan
743            .nodes
744            .get(job_name)
745            .with_context(|| format!("selected job '{job_name}' not found in execution plan"))?;
746        let runner = self.ui_runner_info_for_job(&planned.instance.job);
747        let runner_summary = format!(
748            "engine={} arch={} vcpu={} ram={}",
749            runner.engine,
750            runner.arch.unwrap_or_else(|| "native/default".to_string()),
751            runner.cpus.unwrap_or_else(|| "engine default".to_string()),
752            runner
753                .memory
754                .unwrap_or_else(|| "engine default".to_string())
755        );
756        let job_yaml = self.load_job_yaml_fragment(source_name)?;
757        let pipeline_summary = format!(
758            "dependencies: {}\nneeds: {}\nallow_failure: {}\ninterruptible: {}",
759            if planned.instance.dependencies.is_empty() {
760                "none".to_string()
761            } else {
762                planned.instance.dependencies.join(", ")
763            },
764            if planned.instance.job.needs.is_empty() {
765                "none".to_string()
766            } else {
767                planned
768                    .instance
769                    .job
770                    .needs
771                    .iter()
772                    .map(|need| {
773                        if need.needs_artifacts {
774                            format!("{} (artifacts)", need.job)
775                        } else {
776                            need.job.clone()
777                        }
778                    })
779                    .collect::<Vec<_>>()
780                    .join(", ")
781            },
782            planned.instance.rule.allow_failure,
783            planned.instance.interruptible,
784        );
785        let runtime_summary = self
786            .runtime_state
787            .runtime_objects(job_name)
788            .and_then(|objects| objects.runtime_summary_path)
789            .and_then(|path| fs::read_to_string(path).ok());
790        let log_excerpt = self.read_job_log_excerpt(&planned.log_path)?;
791
792        Ok(AiContext {
793            job_name: job_name.to_string(),
794            source_name: source_name.to_string(),
795            stage: planned.instance.stage_name.clone(),
796            job_yaml,
797            runner_summary,
798            pipeline_summary,
799            runtime_summary,
800            log_excerpt,
801            failure_hint: None,
802        })
803    }
804
805    fn load_job_yaml_fragment(&self, source_name: &str) -> Result<String> {
806        let content = fs::read_to_string(&self.config.pipeline)
807            .with_context(|| format!("failed to read {}", self.config.pipeline.display()))?;
808        let yaml: serde_yaml::Value = serde_yaml::from_str(&content)
809            .with_context(|| format!("failed to parse {}", self.config.pipeline.display()))?;
810        let Some(mapping) = yaml.as_mapping() else {
811            return Ok(format!("# job '{source_name}' not found"));
812        };
813        for (key, value) in mapping {
814            if key.as_str() == Some(source_name) {
815                let mut root = serde_yaml::Mapping::new();
816                root.insert(
817                    serde_yaml::Value::String(source_name.to_string()),
818                    value.clone(),
819                );
820                return Ok(serde_yaml::to_string(&serde_yaml::Value::Mapping(root))?);
821            }
822        }
823        Ok(format!("# job '{source_name}' not found"))
824    }
825
826    fn read_job_log_excerpt(&self, path: &Path) -> Result<String> {
827        let content = fs::read_to_string(path)
828            .with_context(|| format!("failed to read log {}", path.display()))?;
829        let tail_lines = self.config.settings.ai_settings().tail_lines.max(50);
830        let lines: Vec<&str> = content.lines().collect();
831        let start = lines.len().saturating_sub(tail_lines);
832        Ok(lines[start..].join("\n"))
833    }
834
835    fn ui_runner_info_for_job(&self, job: &JobSpec) -> crate::ui::types::UiRunnerInfo {
836        let engine = match self.config.engine {
837            EngineKind::ContainerCli => "container",
838            EngineKind::Docker => "docker",
839            EngineKind::Podman => "podman",
840            EngineKind::Nerdctl => "nerdctl",
841            EngineKind::Orbstack => "orbstack",
842        }
843        .to_string();
844
845        let job_override = self.config.settings.job_override_for(&job.name);
846        let image_platform = job
847            .image
848            .as_ref()
849            .and_then(|image| image.docker_platform.as_deref());
850        let arch = match self.config.engine {
851            EngineKind::ContainerCli => job_override
852                .as_ref()
853                .and_then(|cfg| cfg.arch.clone())
854                .or_else(|| {
855                    self.config
856                        .settings
857                        .container_settings()
858                        .and_then(|cfg| cfg.arch.clone())
859                })
860                .or_else(|| std::env::var("OPAL_CONTAINER_ARCH").ok())
861                .or_else(|| image_platform.and_then(container_arch_from_platform))
862                .or_else(|| normalize_container_arch(std::env::consts::ARCH)),
863            _ => image_platform
864                .and_then(container_arch_from_platform)
865                .or_else(|| job_override.as_ref().and_then(|cfg| cfg.arch.clone()))
866                .or_else(|| normalize_container_arch(std::env::consts::ARCH)),
867        };
868
869        let (cpus, memory) = match self.config.engine {
870            EngineKind::ContainerCli => {
871                let settings = self.config.settings.container_settings();
872                (
873                    Some(
874                        settings
875                            .and_then(|cfg| cfg.cpus.clone())
876                            .unwrap_or_else(|| "4".to_string()),
877                    ),
878                    Some(
879                        settings
880                            .and_then(|cfg| cfg.memory.clone())
881                            .unwrap_or_else(|| "1638m".to_string()),
882                    ),
883                )
884            }
885            _ => (None, None),
886        };
887
888        crate::ui::types::UiRunnerInfo {
889            engine,
890            arch,
891            cpus,
892            memory,
893        }
894    }
895
896    fn job_log_info(&self, job: &JobSpec) -> (PathBuf, String) {
897        logging::job_log_info(&self.logs_dir, &self.run_id, job)
898    }
899
900    fn container_path_rel(&self, host_path: &Path) -> Result<PathBuf> {
901        paths::to_container_path(
902            host_path,
903            &[
904                (&*self.session_dir, &*self.container_session_dir),
905                (&*self.config.workdir, &*self.container_workdir),
906            ],
907        )
908    }
909}
910
911fn empty_execution_plan() -> ExecutionPlan {
912    ExecutionPlan {
913        ordered: Vec::new(),
914        nodes: HashMap::new(),
915        dependents: HashMap::new(),
916        order_index: HashMap::new(),
917        variants: HashMap::new(),
918    }
919}
920
921fn cache_policy_label(policy: CachePolicySpec) -> &'static str {
922    match policy {
923        CachePolicySpec::Pull => "pull",
924        CachePolicySpec::Push => "push",
925        CachePolicySpec::PullPush => "pull-push",
926    }
927}
928
929#[cfg(test)]
930mod tests {
931    // ExecutorCore-specific unit coverage lives in child modules while phase 3 extraction continues.
932}