1mod history_store;
2mod launch;
3mod lifecycle;
4mod preparer;
5mod process;
6mod registry;
7mod runtime_state;
8mod runtime_summary;
9mod stage_tracker;
10mod workspace;
11
12use super::{orchestrator, paths};
13use crate::ai::{self, AiContext, AiProviderKind, AiRequest, render_job_analysis_prompt};
14use crate::compiler::compile_pipeline;
15use crate::display::{self, DisplayFormatter, collect_pipeline_plan, print_pipeline_summary};
16use crate::env::{build_job_env, collect_env_vars, expand_env_list};
17use crate::execution_plan::{ExecutableJob, ExecutionPlan, build_execution_plan};
18use crate::executor::container_arch::{container_arch_from_platform, normalize_container_arch};
19use crate::history::{HistoryCache, HistoryEntry};
20use crate::logging;
21use crate::model::{ArtifactSourceOutcome, CachePolicySpec, JobSpec, PipelineSpec};
22use crate::naming::{generate_run_id, job_name_slug};
23use crate::pipeline::{
24 self, ArtifactManager, CacheManager, ExternalArtifactsManager, JobRunInfo, JobSummary,
25 RuleContext,
26};
27use crate::runner::ExecuteContext;
28use crate::secrets::SecretsStore;
29use crate::terminal::should_use_color;
30use crate::ui::{UiBridge, UiHandle, UiJobInfo, UiJobResources};
31use crate::{EngineKind, ExecutorConfig, runtime};
32use anyhow::{Context, Result};
33use std::collections::{BTreeSet, HashMap};
34use std::env;
35use std::fs;
36use std::path::{Path, PathBuf};
37use std::sync::Arc;
38use tokio::sync::mpsc;
39
40pub(super) const CONTAINER_ROOT: &str = "/builds";
41
42#[derive(Debug, Clone)]
43pub struct ExecutorCore {
44 pub config: ExecutorConfig,
45 pipeline: PipelineSpec,
46 use_color: bool,
47 scripts_dir: PathBuf,
48 logs_dir: PathBuf,
49 session_dir: PathBuf,
50 container_session_dir: PathBuf,
51 run_id: String,
52 verbose_scripts: bool,
53 env_vars: Vec<(String, String)>,
54 shared_env: HashMap<String, String>,
55 container_workdir: PathBuf,
56 stage_tracker: stage_tracker::StageTracker,
57 runtime_state: runtime_state::RuntimeState,
58 history_store: history_store::HistoryStore,
59 secrets: SecretsStore,
60 artifacts: ArtifactManager,
61 cache: CacheManager,
62 external_artifacts: Option<ExternalArtifactsManager>,
63}
64
65#[derive(Debug, Clone)]
66struct JobResourceInfo {
67 artifact_dir: Option<String>,
68 artifact_paths: Vec<String>,
69 caches: Vec<HistoryCache>,
70 container_name: Option<String>,
71 service_network: Option<String>,
72 service_containers: Vec<String>,
73 runtime_summary_path: Option<String>,
74 env_vars: Vec<String>,
75}
76
77impl ExecutorCore {
78 pub fn new(config: ExecutorConfig) -> Result<Self> {
80 let pipeline =
81 PipelineSpec::from_path_with_gitlab(&config.pipeline, config.gitlab.as_ref())?;
82 let run_id = generate_run_id(&config);
83 let runs_root = runtime::runs_root();
84 fs::create_dir_all(&runs_root)
85 .with_context(|| format!("failed to create {:?}", runs_root))?;
86
87 let session_dir = runtime::session_dir(&run_id);
88 if session_dir.exists() {
89 fs::remove_dir_all(&session_dir)
90 .with_context(|| format!("failed to clean {:?}", session_dir))?;
91 }
92 fs::create_dir_all(&session_dir)
93 .with_context(|| format!("failed to create {:?}", session_dir))?;
94
95 let scripts_dir = session_dir.join("scripts");
96 fs::create_dir_all(&scripts_dir)
97 .with_context(|| format!("failed to create {:?}", scripts_dir))?;
98
99 let logs_dir = runtime::logs_dir(&run_id);
100 fs::create_dir_all(&logs_dir)
101 .with_context(|| format!("failed to create {:?}", logs_dir))?;
102
103 let history_store = history_store::HistoryStore::load(runtime::history_path());
104
105 let use_color = should_use_color();
106 let env_verbose = env::var_os("OPAL_DEBUG")
107 .map(|val| {
108 let s = val.to_string_lossy();
109 s == "1" || s.eq_ignore_ascii_case("true")
110 })
111 .unwrap_or(false);
112 let verbose_scripts = config.trace_scripts || env_verbose;
113 let mut env_vars = collect_env_vars(&config.env_includes)?;
114 let mut shared_env: HashMap<String, String> = env::vars().collect();
115 expand_env_list(&mut env_vars[..], &shared_env);
116 shared_env.extend(env_vars.iter().cloned());
117 let stage_specs: Vec<(String, usize)> = pipeline
118 .stages
119 .iter()
120 .map(|stage| (stage.name.clone(), stage.jobs.len()))
121 .collect();
122 let stage_tracker = stage_tracker::StageTracker::new(&stage_specs);
123
124 let secrets = SecretsStore::load(&config.workdir)?;
125 shared_env.extend(secrets.env_pairs());
126 let artifacts = ArtifactManager::new(session_dir.clone());
127 let cache_root = runtime::cache_root();
128 fs::create_dir_all(&cache_root)
129 .with_context(|| format!("failed to create cache root {:?}", cache_root))?;
130 let cache = CacheManager::new(cache_root);
131 let external_artifacts = config.gitlab.as_ref().map(|cfg| {
132 ExternalArtifactsManager::new(
133 session_dir.clone(),
134 cfg.base_url.clone(),
135 cfg.token.clone(),
136 )
137 });
138 let project_dir = config
139 .workdir
140 .file_name()
141 .and_then(|n| n.to_str())
142 .unwrap_or("project");
143 let container_workdir = Path::new(CONTAINER_ROOT).join(project_dir);
144 let container_session_dir = Path::new("/opal").join(&run_id);
145
146 let core = Self {
147 config,
148 pipeline,
149 use_color,
150 scripts_dir,
151 logs_dir,
152 session_dir,
153 container_session_dir,
154 run_id,
155 verbose_scripts,
156 env_vars,
157 shared_env,
158 container_workdir,
159 stage_tracker,
160 runtime_state: runtime_state::RuntimeState::default(),
161 history_store,
162 secrets,
163 artifacts,
164 cache,
165 external_artifacts,
166 };
167
168 registry::ensure_registry_logins(&core)?;
169
170 Ok(core)
171 }
172
173 pub async fn run(&self) -> Result<()> {
174 let plan = Arc::new(self.plan_jobs()?);
175 let resource_map = self.collect_job_resources(&plan);
176 let display = self.display();
177 let plan_text = collect_pipeline_plan(&display, &plan).join("\n");
178 let ui_resources = Self::convert_ui_resources(&resource_map);
179 let history_snapshot = self.history_store.snapshot();
180 let ui_handle = if self.config.enable_tui {
181 let variant_sources: HashMap<String, String> = plan
182 .variants
183 .iter()
184 .flat_map(|(source, variants)| {
185 variants
186 .iter()
187 .map(move |variant| (variant.name.clone(), source.clone()))
188 })
189 .collect();
190 let jobs: Vec<UiJobInfo> = plan
191 .ordered
192 .iter()
193 .filter_map(|name| plan.nodes.get(name))
194 .map(|planned| UiJobInfo {
195 name: planned.instance.job.name.clone(),
196 source_name: variant_sources
197 .get(&planned.instance.job.name)
198 .cloned()
199 .unwrap_or_else(|| planned.instance.job.name.clone()),
200 stage: planned.instance.stage_name.clone(),
201 log_path: planned.log_path.clone(),
202 log_hash: planned.log_hash.clone(),
203 runner: self.ui_runner_info_for_job(&planned.instance.job),
204 })
205 .collect();
206 Some(UiHandle::start(
207 jobs,
208 history_snapshot,
209 self.run_id.clone(),
210 ui_resources,
211 plan_text,
212 self.config.workdir.clone(),
213 self.config.pipeline.clone(),
214 )?)
215 } else {
216 None
217 };
218 let mut owned_command_rx = if !self.config.enable_tui {
219 let (tx, rx) = mpsc::unbounded_channel();
220 if let Ok(raw) = env::var("OPAL_ABORT_AFTER_SECS")
221 && let Ok(seconds) = raw.parse::<u64>()
222 && seconds > 0
223 {
224 tokio::spawn(async move {
225 tokio::time::sleep(std::time::Duration::from_secs(seconds)).await;
226 let _ = tx.send(crate::ui::UiCommand::AbortPipeline);
227 });
228 }
229 Some(rx)
230 } else {
231 None
232 };
233 let mut ui_command_rx = ui_handle
234 .as_ref()
235 .and_then(|handle| handle.command_receiver());
236 let command_rx = ui_command_rx.as_mut().or(owned_command_rx.as_mut());
237 let ui_bridge = ui_handle.as_ref().map(|handle| Arc::new(handle.bridge()));
238
239 let (mut summaries, result) =
240 orchestrator::execute_plan(self, plan.clone(), ui_bridge.clone(), command_rx).await;
241
242 if let Some(handle) = &ui_handle {
243 handle.pipeline_finished();
244 }
245
246 if let Some(commands) = ui_command_rx.as_mut() {
247 orchestrator::handle_restart_commands(
248 self,
249 plan.clone(),
250 ui_bridge.clone(),
251 commands,
252 &mut summaries,
253 )
254 .await?;
255 }
256
257 let history_entry = self.record_pipeline_history(&summaries, &resource_map);
258 if let (Some(entry), Some(ui)) = (history_entry, ui_bridge.as_deref()) {
259 ui.history_updated(entry);
260 }
261
262 if let Some(handle) = ui_handle {
263 handle.wait_for_exit();
264 }
265
266 if !self.config.enable_tui {
267 print_pipeline_summary(
268 &display,
269 &plan,
270 &summaries,
271 &self.session_dir,
272 display::print_line,
273 );
274 }
275 result
276 }
277
278 fn plan_jobs(&self) -> Result<ExecutionPlan> {
279 let run_manual = env::var("OPAL_RUN_MANUAL").is_ok_and(|v| v == "1");
280 let ctx = RuleContext::from_env(&self.config.workdir, self.shared_env.clone(), run_manual);
281 ctx.ensure_valid_tag_context()?;
282 if !pipeline::rules::filters_allow(&self.pipeline.filters, &ctx) {
283 return Ok(empty_execution_plan());
284 }
285 if let Some(workflow) = &self.pipeline.workflow
286 && !pipeline::rules::evaluate_workflow(&workflow.rules, &ctx)?
287 {
288 return Ok(empty_execution_plan());
289 }
290 let compiled = compile_pipeline(&self.pipeline, Some(&ctx))?;
291 let mut plan = build_execution_plan(compiled, |job| self.job_log_info(job))?;
292 if !self.config.selected_jobs.is_empty() {
293 plan = plan.select_jobs(&self.config.selected_jobs)?;
294 }
295 Ok(plan)
296 }
297
298 fn collect_job_resources(&self, plan: &ExecutionPlan) -> HashMap<String, JobResourceInfo> {
299 plan.nodes
300 .values()
301 .map(|planned| {
302 let artifact_dir = if planned.instance.job.artifacts.paths.is_empty() {
304 None
305 } else {
306 Some(
307 self.artifacts
308 .job_artifacts_root(&planned.instance.job.name)
309 .display()
310 .to_string(),
311 )
312 };
313 let artifact_paths = planned
314 .instance
315 .job
316 .artifacts
317 .paths
318 .iter()
319 .map(|path| path.display().to_string())
320 .collect();
321 let env_vars = self.job_env(&planned.instance.job);
322 let cache_env: HashMap<String, String> = env_vars.iter().cloned().collect();
323 let caches = self
324 .cache
325 .describe_entries(
326 &planned.instance.job.cache,
327 &self.config.workdir,
328 &cache_env,
329 )
330 .into_iter()
331 .map(|entry| HistoryCache {
332 key: entry.key,
333 policy: cache_policy_label(entry.policy).to_string(),
334 host: entry.host.display().to_string(),
335 paths: entry
336 .paths
337 .iter()
338 .map(|path| path.display().to_string())
339 .collect(),
340 })
341 .collect();
342 (
343 planned.instance.job.name.clone(),
344 JobResourceInfo {
345 artifact_dir,
346 artifact_paths,
347 caches,
348 container_name: None,
349 service_network: None,
350 service_containers: Vec::new(),
351 runtime_summary_path: None,
352 env_vars: self.visible_job_env_vars(&planned.instance.job),
353 },
354 )
355 })
356 .collect()
357 }
358
359 fn convert_ui_resources(
360 resources: &HashMap<String, JobResourceInfo>,
361 ) -> HashMap<String, UiJobResources> {
362 resources
363 .iter()
364 .map(|(name, info)| {
365 (
366 name.clone(),
367 UiJobResources {
368 artifact_dir: info.artifact_dir.clone(),
369 artifact_paths: info.artifact_paths.clone(),
370 caches: info.caches.clone(),
371 container_name: info.container_name.clone(),
372 service_network: info.service_network.clone(),
373 service_containers: info.service_containers.clone(),
374 runtime_summary_path: info.runtime_summary_path.clone(),
375 env_vars: info.env_vars.clone(),
376 },
377 )
378 })
379 .collect()
380 }
381
382 fn record_pipeline_history(
383 &self,
384 summaries: &[JobSummary],
385 resources: &HashMap<String, JobResourceInfo>,
386 ) -> Option<HistoryEntry> {
387 let resource_map = resources
388 .iter()
389 .map(|(name, info)| {
390 let runtime = self.runtime_state.runtime_objects(name);
391 (
392 name.clone(),
393 history_store::HistoryResources {
394 artifact_dir: info.artifact_dir.clone(),
395 artifacts: info.artifact_paths.clone(),
396 caches: info.caches.clone(),
397 container_name: runtime
398 .as_ref()
399 .and_then(|objects| objects.container_name.clone())
400 .or_else(|| info.container_name.clone()),
401 service_network: runtime
402 .as_ref()
403 .and_then(|objects| objects.service_network.clone())
404 .or_else(|| info.service_network.clone()),
405 service_containers: runtime
406 .as_ref()
407 .map(|objects| objects.service_containers.clone())
408 .unwrap_or_else(|| info.service_containers.clone()),
409 runtime_summary_path: runtime
410 .as_ref()
411 .and_then(|objects| objects.runtime_summary_path.clone())
412 .or_else(|| info.runtime_summary_path.clone()),
413 env_vars: info.env_vars.clone(),
414 },
415 )
416 })
417 .collect();
418 self.history_store
419 .record(&self.run_id, summaries, &resource_map)
420 }
421
422 pub(crate) fn record_runtime_objects(
423 &self,
424 job_name: &str,
425 container_name: String,
426 service_network: Option<String>,
427 service_containers: Vec<String>,
428 runtime_summary_path: Option<String>,
429 ) {
430 self.runtime_state.record_runtime_objects(
431 job_name,
432 container_name,
433 service_network,
434 service_containers,
435 runtime_summary_path,
436 );
437 }
438
439 pub(crate) fn write_runtime_summary(
440 &self,
441 job_name: &str,
442 container_name: &str,
443 service_network: Option<&str>,
444 service_containers: &[String],
445 ) -> Result<Option<String>> {
446 runtime_summary::write_runtime_summary(
447 self,
448 job_name,
449 container_name,
450 service_network,
451 service_containers,
452 )
453 }
454
455 pub(crate) fn log_job_start(
456 &self,
457 planned: &ExecutableJob,
458 ui: Option<&UiBridge>,
459 ) -> Result<JobRunInfo> {
460 launch::log_job_start(self, planned, ui)
461 }
462
463 pub(crate) fn prepare_job_run(
464 &self,
465 plan: &ExecutionPlan,
466 job: &JobSpec,
467 ) -> Result<preparer::PreparedJobRun> {
468 preparer::prepare_job_run(self, plan, job)
469 }
470
471 pub(crate) fn collect_untracked_artifacts(
472 &self,
473 job: &JobSpec,
474 workspace: &Path,
475 ) -> Result<()> {
476 self.artifacts.collect_untracked(job, workspace)
477 }
478
479 pub(crate) fn collect_declared_artifacts(
480 &self,
481 job: &JobSpec,
482 workspace: &Path,
483 mounts: &[crate::pipeline::VolumeMount],
484 ) -> Result<()> {
485 self.artifacts
486 .collect_declared(job, workspace, mounts, &self.container_workdir)
487 }
488
489 pub(crate) fn collect_dotenv_artifacts(
490 &self,
491 job: &JobSpec,
492 workspace: &Path,
493 mounts: &[crate::pipeline::VolumeMount],
494 ) -> Result<()> {
495 self.artifacts
496 .collect_dotenv_report(job, workspace, mounts, &self.container_workdir)
497 }
498
499 pub(crate) fn clear_running_container(&self, job_name: &str) {
500 self.runtime_state.clear_running_container(job_name);
501 }
502
503 pub(crate) fn record_completed_job(&self, job_name: &str, outcome: ArtifactSourceOutcome) {
504 self.runtime_state.record_completed_job(job_name, outcome);
505 }
506
507 pub(crate) fn completed_jobs(&self) -> HashMap<String, ArtifactSourceOutcome> {
508 self.runtime_state.completed_jobs()
509 }
510
511 pub(crate) fn take_cancelled_job(&self, job_name: &str) -> bool {
512 self.runtime_state.take_cancelled_job(job_name)
513 }
514
515 pub(crate) fn cancel_running_job(&self, job_name: &str) -> bool {
516 let container = self.runtime_state.running_container(job_name);
517 if let Some(container_name) = container {
518 self.runtime_state.mark_job_cancelled(job_name);
519 self.kill_container(job_name, &container_name);
520 true
521 } else {
522 false
523 }
524 }
525
526 pub(crate) fn execute(&self, ctx: ExecuteContext<'_>) -> Result<()> {
527 process::execute(self, ctx)
528 }
529
530 pub(crate) fn print_job_completion(
531 &self,
532 stage_name: &str,
533 script_path: &Path,
534 log_path: &Path,
535 elapsed: f32,
536 ) {
537 if !self.config.enable_tui {
538 let display = self.display();
539 display::print_line(format!(" script stored at {}", script_path.display()));
540 display::print_line(format!(" log file stored at {}", log_path.display()));
541 let finish_label = display.bold_green(" ✓ finished in");
542 display::print_line(format!("{} {:.2}s", finish_label, elapsed));
543
544 if let Some(stage_elapsed) = self.stage_tracker.complete_job(stage_name) {
545 let stage_footer = display.bold_blue("╰─ stage complete in");
546 display::print_line(format!("{stage_footer} {:.2}s", stage_elapsed));
547 }
548 }
549 }
550
551 pub(crate) fn kill_container(&self, job_name: &str, container_name: &str) {
552 lifecycle::kill_container(self, job_name, container_name);
553 }
554
555 pub(crate) fn cleanup_finished_container(&self, container_name: &str) {
556 lifecycle::cleanup_finished_container(self, container_name);
557 }
558
559 fn resolve_job_image_with_env(
560 &self,
561 job: &JobSpec,
562 env_lookup: Option<&HashMap<String, String>>,
563 ) -> Result<crate::model::ImageSpec> {
564 launch::resolve_job_image_with_env(self, job, env_lookup)
565 }
566
567 fn job_env(&self, job: &JobSpec) -> Vec<(String, String)> {
568 build_job_env(
569 &self.env_vars,
570 &self.pipeline.defaults.variables,
571 job,
572 &self.secrets,
573 &self.config.workdir,
574 &self.container_workdir,
575 Path::new(CONTAINER_ROOT),
576 &self.run_id,
577 &self.shared_env,
578 )
579 }
580
581 fn visible_job_env_vars(&self, job: &JobSpec) -> Vec<String> {
582 let mut vars = BTreeSet::new();
583 for (key, _) in self.job_env(job) {
584 if is_user_visible_job_env(&key) {
585 vars.insert(key);
586 }
587 }
588 vars.into_iter().collect()
589 }
590
591 pub(crate) fn expanded_environment(
592 &self,
593 job: &JobSpec,
594 ) -> Option<crate::model::EnvironmentSpec> {
595 let environment = job.environment.as_ref()?;
596 let lookup: HashMap<String, String> = self.job_env(job).into_iter().collect();
597 Some(crate::env::expand_environment(environment, &lookup))
598 }
599
600 fn display(&self) -> DisplayFormatter {
601 DisplayFormatter::new(self.use_color)
602 }
603
604 pub(crate) fn analyze_job_with_default_provider(
605 &self,
606 plan: &ExecutionPlan,
607 job_name: &str,
608 source_name: &str,
609 ui: Option<&UiBridge>,
610 ) {
611 let provider = self
612 .config
613 .settings
614 .ai_settings()
615 .default_provider
616 .unwrap_or(crate::config::AiProviderConfig::Ollama);
617 let provider_kind = match provider {
618 crate::config::AiProviderConfig::Ollama => AiProviderKind::Ollama,
619 crate::config::AiProviderConfig::Claude => AiProviderKind::Claude,
620 crate::config::AiProviderConfig::Codex => AiProviderKind::Codex,
621 };
622 let provider_label = match provider_kind {
623 AiProviderKind::Ollama => "ollama",
624 AiProviderKind::Claude => "claude",
625 AiProviderKind::Codex => "codex",
626 };
627 if let Some(ui) = ui {
628 ui.analysis_started(job_name, provider_label);
629 }
630
631 let outcome = (|| -> Result<Option<PathBuf>> {
632 if provider_kind == AiProviderKind::Ollama
633 && self
634 .config
635 .settings
636 .ai_settings()
637 .ollama
638 .model
639 .trim()
640 .is_empty()
641 {
642 anyhow::bail!(
643 "Ollama analysis requires [ai.ollama].model in config; Opal does not choose a default model for you"
644 );
645 }
646 let rendered = self.render_ai_prompt_parts(plan, job_name, source_name)?;
647 let prompt = self.secrets.mask_fragment(&rendered.prompt);
648 let system = rendered
649 .system
650 .as_deref()
651 .map(|text| self.secrets.mask_fragment(text).into_owned());
652
653 let save_path = self.config.settings.ai_settings().save_analysis.then(|| {
654 self.session_dir
655 .join(job_name_slug(job_name))
656 .join("analysis")
657 .join(format!("{provider_label}.md"))
658 });
659
660 let request = AiRequest {
661 provider: provider_kind,
662 prompt: prompt.into_owned(),
663 system,
664 host: (provider_kind == AiProviderKind::Ollama)
665 .then(|| self.config.settings.ai_settings().ollama.host.clone()),
666 model: match provider_kind {
667 AiProviderKind::Ollama => {
668 Some(self.config.settings.ai_settings().ollama.model.clone())
669 }
670 AiProviderKind::Codex => self.config.settings.ai_settings().codex.model.clone(),
671 AiProviderKind::Claude => None,
672 },
673 command: (provider_kind == AiProviderKind::Codex)
674 .then(|| self.config.settings.ai_settings().codex.command.clone()),
675 args: Vec::new(),
676 workdir: (provider_kind == AiProviderKind::Codex)
677 .then(|| self.config.workdir.clone()),
678 save_path: save_path.clone(),
679 };
680
681 let result = ai::analyze_with_default_provider(&request, |chunk| {
682 if let (Some(ui), ai::AiChunk::Text(text)) = (ui, chunk) {
683 ui.analysis_chunk(job_name, &text);
684 }
685 })
686 .map_err(|err| anyhow::anyhow!(err.message))?;
687
688 if let Some(path) = &save_path {
689 if let Some(parent) = path.parent() {
690 fs::create_dir_all(parent)?;
691 }
692 fs::write(path, result.text)?;
693 }
694
695 Ok(save_path)
696 })();
697
698 if let Some(ui) = ui {
699 match outcome {
700 Ok(saved_path) => ui.analysis_finished(
701 job_name,
702 if let Some(path) = &saved_path {
703 fs::read_to_string(path).unwrap_or_default()
704 } else {
705 String::new()
706 },
707 saved_path,
708 None,
709 ),
710 Err(err) => {
711 ui.analysis_finished(job_name, String::new(), None, Some(err.to_string()))
712 }
713 }
714 }
715 }
716
717 pub(crate) fn render_ai_prompt(
718 &self,
719 plan: &ExecutionPlan,
720 job_name: &str,
721 source_name: &str,
722 ) -> Result<String> {
723 let rendered = self.render_ai_prompt_parts(plan, job_name, source_name)?;
724 let mut text = String::new();
725 if let Some(system) = rendered.system {
726 text.push_str("# System\n\n");
727 text.push_str(system.trim());
728 text.push_str("\n\n");
729 }
730 text.push_str("# Prompt\n\n");
731 text.push_str(rendered.prompt.trim());
732 text.push('\n');
733 Ok(text)
734 }
735
736 fn render_ai_prompt_parts(
737 &self,
738 plan: &ExecutionPlan,
739 job_name: &str,
740 source_name: &str,
741 ) -> Result<crate::ai::RenderedPrompt> {
742 let context = self.build_ai_context(plan, job_name, source_name)?;
743 render_job_analysis_prompt(
744 &self.config.workdir,
745 self.config.settings.ai_settings(),
746 &context,
747 )
748 }
749
750 fn build_ai_context(
751 &self,
752 plan: &ExecutionPlan,
753 job_name: &str,
754 source_name: &str,
755 ) -> Result<AiContext> {
756 let planned = plan
757 .nodes
758 .get(job_name)
759 .with_context(|| format!("selected job '{job_name}' not found in execution plan"))?;
760 let runner = self.ui_runner_info_for_job(&planned.instance.job);
761 let runner_summary = format!(
762 "engine={} arch={} vcpu={} ram={}",
763 runner.engine,
764 runner.arch.unwrap_or_else(|| "native/default".to_string()),
765 runner.cpus.unwrap_or_else(|| "engine default".to_string()),
766 runner
767 .memory
768 .unwrap_or_else(|| "engine default".to_string())
769 );
770 let job_yaml = self.load_job_yaml_fragment(source_name)?;
771 let pipeline_summary = format!(
772 "dependencies: {}\nneeds: {}\nallow_failure: {}\ninterruptible: {}",
773 if planned.instance.dependencies.is_empty() {
774 "none".to_string()
775 } else {
776 planned.instance.dependencies.join(", ")
777 },
778 if planned.instance.job.needs.is_empty() {
779 "none".to_string()
780 } else {
781 planned
782 .instance
783 .job
784 .needs
785 .iter()
786 .map(|need| {
787 if need.needs_artifacts {
788 format!("{} (artifacts)", need.job)
789 } else {
790 need.job.clone()
791 }
792 })
793 .collect::<Vec<_>>()
794 .join(", ")
795 },
796 planned.instance.rule.allow_failure,
797 planned.instance.interruptible,
798 );
799 let runtime_summary = self
800 .runtime_state
801 .runtime_objects(job_name)
802 .and_then(|objects| objects.runtime_summary_path)
803 .and_then(|path| fs::read_to_string(path).ok());
804 let log_excerpt = self.read_job_log_excerpt(&planned.log_path)?;
805
806 Ok(AiContext {
807 job_name: job_name.to_string(),
808 source_name: source_name.to_string(),
809 stage: planned.instance.stage_name.clone(),
810 job_yaml,
811 runner_summary,
812 pipeline_summary,
813 runtime_summary,
814 log_excerpt,
815 failure_hint: None,
816 })
817 }
818
819 fn load_job_yaml_fragment(&self, source_name: &str) -> Result<String> {
820 let content = fs::read_to_string(&self.config.pipeline)
821 .with_context(|| format!("failed to read {}", self.config.pipeline.display()))?;
822 let yaml: serde_yaml::Value = serde_yaml::from_str(&content)
823 .with_context(|| format!("failed to parse {}", self.config.pipeline.display()))?;
824 let Some(mapping) = yaml.as_mapping() else {
825 return Ok(format!("# job '{source_name}' not found"));
826 };
827 for (key, value) in mapping {
828 if key.as_str() == Some(source_name) {
829 let mut root = serde_yaml::Mapping::new();
830 root.insert(
831 serde_yaml::Value::String(source_name.to_string()),
832 value.clone(),
833 );
834 return Ok(serde_yaml::to_string(&serde_yaml::Value::Mapping(root))?);
835 }
836 }
837 Ok(format!("# job '{source_name}' not found"))
838 }
839
840 fn read_job_log_excerpt(&self, path: &Path) -> Result<String> {
841 let content = fs::read_to_string(path)
842 .with_context(|| format!("failed to read log {}", path.display()))?;
843 let tail_lines = self.config.settings.ai_settings().tail_lines.max(50);
844 let lines: Vec<&str> = content.lines().collect();
845 let start = lines.len().saturating_sub(tail_lines);
846 Ok(lines[start..].join("\n"))
847 }
848
849 fn ui_runner_info_for_job(&self, job: &JobSpec) -> crate::ui::types::UiRunnerInfo {
850 let engine = match self.config.engine {
851 EngineKind::ContainerCli => "container",
852 EngineKind::Docker => "docker",
853 EngineKind::Podman => "podman",
854 EngineKind::Nerdctl => "nerdctl",
855 EngineKind::Orbstack => "orbstack",
856 }
857 .to_string();
858
859 let job_override = self.config.settings.job_override_for(&job.name);
860 let image_platform = job
861 .image
862 .as_ref()
863 .and_then(|image| image.docker_platform.as_deref());
864 let arch = match self.config.engine {
865 EngineKind::ContainerCli => job_override
866 .as_ref()
867 .and_then(|cfg| cfg.arch.clone())
868 .or_else(|| {
869 self.config
870 .settings
871 .container_settings()
872 .and_then(|cfg| cfg.arch.clone())
873 })
874 .or_else(|| std::env::var("OPAL_CONTAINER_ARCH").ok())
875 .or_else(|| image_platform.and_then(container_arch_from_platform))
876 .or_else(|| normalize_container_arch(std::env::consts::ARCH)),
877 _ => image_platform
878 .and_then(container_arch_from_platform)
879 .or_else(|| job_override.as_ref().and_then(|cfg| cfg.arch.clone()))
880 .or_else(|| normalize_container_arch(std::env::consts::ARCH)),
881 };
882
883 let (cpus, memory) = match self.config.engine {
884 EngineKind::ContainerCli => {
885 let settings = self.config.settings.container_settings();
886 (
887 Some(
888 settings
889 .and_then(|cfg| cfg.cpus.clone())
890 .unwrap_or_else(|| "4".to_string()),
891 ),
892 Some(
893 settings
894 .and_then(|cfg| cfg.memory.clone())
895 .unwrap_or_else(|| "1638m".to_string()),
896 ),
897 )
898 }
899 _ => (None, None),
900 };
901
902 crate::ui::types::UiRunnerInfo {
903 engine,
904 arch,
905 cpus,
906 memory,
907 }
908 }
909
910 fn job_log_info(&self, job: &JobSpec) -> (PathBuf, String) {
911 logging::job_log_info(&self.logs_dir, &self.run_id, job)
912 }
913
914 fn container_path_rel(&self, host_path: &Path) -> Result<PathBuf> {
915 paths::to_container_path(
916 host_path,
917 &[
918 (&*self.session_dir, &*self.container_session_dir),
919 (&*self.config.workdir, &*self.container_workdir),
920 ],
921 )
922 }
923}
924
925fn is_user_visible_job_env(key: &str) -> bool {
926 !(key == "CI"
927 || key == "PATH"
928 || key == "PWD"
929 || key == "SHLVL"
930 || key == "GITLAB_CI"
931 || key == "OPAL_IN_OPAL"
932 || key == "_"
933 || key.starts_with("CI_")
934 || key.starts_with("GITLAB_")
935 || key.starts_with("OPAL_"))
936}
937
938fn empty_execution_plan() -> ExecutionPlan {
939 ExecutionPlan {
940 ordered: Vec::new(),
941 nodes: HashMap::new(),
942 dependents: HashMap::new(),
943 order_index: HashMap::new(),
944 variants: HashMap::new(),
945 }
946}
947
948fn cache_policy_label(policy: CachePolicySpec) -> &'static str {
949 match policy {
950 CachePolicySpec::Pull => "pull",
951 CachePolicySpec::Push => "push",
952 CachePolicySpec::PullPush => "pull-push",
953 }
954}
955
956#[cfg(test)]
957mod tests {
958 }