1mod history_store;
2mod launch;
3mod lifecycle;
4mod preparer;
5mod process;
6mod registry;
7mod runtime_state;
8mod runtime_summary;
9mod stage_tracker;
10mod workspace;
11
12use super::{orchestrator, paths};
13use crate::ai::{self, AiContext, AiProviderKind, AiRequest, render_job_analysis_prompt};
14use crate::compiler::compile_pipeline;
15use crate::display::{self, DisplayFormatter, collect_pipeline_plan, print_pipeline_summary};
16use crate::env::{build_job_env, collect_env_vars, expand_env_list};
17use crate::execution_plan::{ExecutableJob, ExecutionPlan, build_execution_plan};
18use crate::executor::container_arch::{container_arch_from_platform, normalize_container_arch};
19use crate::history::{HistoryCache, HistoryEntry};
20use crate::logging;
21use crate::model::{ArtifactSourceOutcome, CachePolicySpec, JobSpec, PipelineSpec};
22use crate::naming::{generate_run_id, job_name_slug};
23use crate::pipeline::{
24 self, ArtifactManager, CacheManager, ExternalArtifactsManager, JobRunInfo, JobSummary,
25 RuleContext,
26};
27use crate::runner::ExecuteContext;
28use crate::secrets::SecretsStore;
29use crate::terminal::should_use_color;
30use crate::ui::{UiBridge, UiHandle, UiJobInfo, UiJobResources};
31use crate::{EngineKind, ExecutorConfig, runtime};
32use anyhow::{Context, Result};
33use std::collections::HashMap;
34use std::env;
35use std::fs;
36use std::path::{Path, PathBuf};
37use std::sync::Arc;
38use tokio::sync::mpsc;
39
40pub(super) const CONTAINER_ROOT: &str = "/builds";
41
42#[derive(Debug, Clone)]
43pub struct ExecutorCore {
44 pub config: ExecutorConfig,
45 pipeline: PipelineSpec,
46 use_color: bool,
47 scripts_dir: PathBuf,
48 logs_dir: PathBuf,
49 session_dir: PathBuf,
50 container_session_dir: PathBuf,
51 run_id: String,
52 verbose_scripts: bool,
53 env_vars: Vec<(String, String)>,
54 shared_env: HashMap<String, String>,
55 container_workdir: PathBuf,
56 stage_tracker: stage_tracker::StageTracker,
57 runtime_state: runtime_state::RuntimeState,
58 history_store: history_store::HistoryStore,
59 secrets: SecretsStore,
60 artifacts: ArtifactManager,
61 cache: CacheManager,
62 external_artifacts: Option<ExternalArtifactsManager>,
63}
64
65#[derive(Debug, Clone)]
66struct JobResourceInfo {
67 artifact_dir: Option<String>,
68 artifact_paths: Vec<String>,
69 caches: Vec<HistoryCache>,
70 container_name: Option<String>,
71 service_network: Option<String>,
72 service_containers: Vec<String>,
73 runtime_summary_path: Option<String>,
74}
75
76impl ExecutorCore {
77 pub fn new(config: ExecutorConfig) -> Result<Self> {
79 let pipeline =
80 PipelineSpec::from_path_with_gitlab(&config.pipeline, config.gitlab.as_ref())?;
81 let run_id = generate_run_id(&config);
82 let runs_root = runtime::runs_root();
83 fs::create_dir_all(&runs_root)
84 .with_context(|| format!("failed to create {:?}", runs_root))?;
85
86 let session_dir = runtime::session_dir(&run_id);
87 if session_dir.exists() {
88 fs::remove_dir_all(&session_dir)
89 .with_context(|| format!("failed to clean {:?}", session_dir))?;
90 }
91 fs::create_dir_all(&session_dir)
92 .with_context(|| format!("failed to create {:?}", session_dir))?;
93
94 let scripts_dir = session_dir.join("scripts");
95 fs::create_dir_all(&scripts_dir)
96 .with_context(|| format!("failed to create {:?}", scripts_dir))?;
97
98 let logs_dir = runtime::logs_dir(&run_id);
99 fs::create_dir_all(&logs_dir)
100 .with_context(|| format!("failed to create {:?}", logs_dir))?;
101
102 let history_store = history_store::HistoryStore::load(runtime::history_path());
103
104 let use_color = should_use_color();
105 let env_verbose = env::var_os("OPAL_DEBUG")
106 .map(|val| {
107 let s = val.to_string_lossy();
108 s == "1" || s.eq_ignore_ascii_case("true")
109 })
110 .unwrap_or(false);
111 let verbose_scripts = config.trace_scripts || env_verbose;
112 let mut env_vars = collect_env_vars(&config.env_includes)?;
113 let mut shared_env: HashMap<String, String> = env::vars().collect();
114 expand_env_list(&mut env_vars[..], &shared_env);
115 shared_env.extend(env_vars.iter().cloned());
116 let stage_specs: Vec<(String, usize)> = pipeline
117 .stages
118 .iter()
119 .map(|stage| (stage.name.clone(), stage.jobs.len()))
120 .collect();
121 let stage_tracker = stage_tracker::StageTracker::new(&stage_specs);
122
123 let secrets = SecretsStore::load(&config.workdir)?;
124 shared_env.extend(secrets.env_pairs());
125 let artifacts = ArtifactManager::new(session_dir.clone());
126 let cache_root = runtime::cache_root();
127 fs::create_dir_all(&cache_root)
128 .with_context(|| format!("failed to create cache root {:?}", cache_root))?;
129 let cache = CacheManager::new(cache_root);
130 let external_artifacts = config.gitlab.as_ref().map(|cfg| {
131 ExternalArtifactsManager::new(
132 session_dir.clone(),
133 cfg.base_url.clone(),
134 cfg.token.clone(),
135 )
136 });
137 let project_dir = config
138 .workdir
139 .file_name()
140 .and_then(|n| n.to_str())
141 .unwrap_or("project");
142 let container_workdir = Path::new(CONTAINER_ROOT).join(project_dir);
143 let container_session_dir = Path::new("/opal").join(&run_id);
144
145 let core = Self {
146 config,
147 pipeline,
148 use_color,
149 scripts_dir,
150 logs_dir,
151 session_dir,
152 container_session_dir,
153 run_id,
154 verbose_scripts,
155 env_vars,
156 shared_env,
157 container_workdir,
158 stage_tracker,
159 runtime_state: runtime_state::RuntimeState::default(),
160 history_store,
161 secrets,
162 artifacts,
163 cache,
164 external_artifacts,
165 };
166
167 registry::ensure_registry_logins(&core)?;
168
169 Ok(core)
170 }
171
172 pub async fn run(&self) -> Result<()> {
173 let plan = Arc::new(self.plan_jobs()?);
174 let resource_map = self.collect_job_resources(&plan);
175 let display = self.display();
176 let plan_text = collect_pipeline_plan(&display, &plan).join("\n");
177 let ui_resources = Self::convert_ui_resources(&resource_map);
178 let history_snapshot = self.history_store.snapshot();
179 let ui_handle = if self.config.enable_tui {
180 let variant_sources: HashMap<String, String> = plan
181 .variants
182 .iter()
183 .flat_map(|(source, variants)| {
184 variants
185 .iter()
186 .map(move |variant| (variant.name.clone(), source.clone()))
187 })
188 .collect();
189 let jobs: Vec<UiJobInfo> = plan
190 .ordered
191 .iter()
192 .filter_map(|name| plan.nodes.get(name))
193 .map(|planned| UiJobInfo {
194 name: planned.instance.job.name.clone(),
195 source_name: variant_sources
196 .get(&planned.instance.job.name)
197 .cloned()
198 .unwrap_or_else(|| planned.instance.job.name.clone()),
199 stage: planned.instance.stage_name.clone(),
200 log_path: planned.log_path.clone(),
201 log_hash: planned.log_hash.clone(),
202 runner: self.ui_runner_info_for_job(&planned.instance.job),
203 })
204 .collect();
205 Some(UiHandle::start(
206 jobs,
207 history_snapshot,
208 self.run_id.clone(),
209 ui_resources,
210 plan_text,
211 self.config.workdir.clone(),
212 self.config.pipeline.clone(),
213 )?)
214 } else {
215 None
216 };
217 let mut owned_command_rx = if !self.config.enable_tui {
218 let (tx, rx) = mpsc::unbounded_channel();
219 if let Ok(raw) = env::var("OPAL_ABORT_AFTER_SECS")
220 && let Ok(seconds) = raw.parse::<u64>()
221 && seconds > 0
222 {
223 tokio::spawn(async move {
224 tokio::time::sleep(std::time::Duration::from_secs(seconds)).await;
225 let _ = tx.send(crate::ui::UiCommand::AbortPipeline);
226 });
227 }
228 Some(rx)
229 } else {
230 None
231 };
232 let mut ui_command_rx = ui_handle
233 .as_ref()
234 .and_then(|handle| handle.command_receiver());
235 let command_rx = ui_command_rx.as_mut().or(owned_command_rx.as_mut());
236 let ui_bridge = ui_handle.as_ref().map(|handle| Arc::new(handle.bridge()));
237
238 let (mut summaries, result) =
239 orchestrator::execute_plan(self, plan.clone(), ui_bridge.clone(), command_rx).await;
240
241 if let Some(handle) = &ui_handle {
242 handle.pipeline_finished();
243 }
244
245 if let Some(commands) = ui_command_rx.as_mut() {
246 orchestrator::handle_restart_commands(
247 self,
248 plan.clone(),
249 ui_bridge.clone(),
250 commands,
251 &mut summaries,
252 )
253 .await?;
254 }
255
256 let history_entry = self.record_pipeline_history(&summaries, &resource_map);
257 if let (Some(entry), Some(ui)) = (history_entry, ui_bridge.as_deref()) {
258 ui.history_updated(entry);
259 }
260
261 if let Some(handle) = ui_handle {
262 handle.wait_for_exit();
263 }
264
265 if !self.config.enable_tui {
266 print_pipeline_summary(
267 &display,
268 &plan,
269 &summaries,
270 &self.session_dir,
271 display::print_line,
272 );
273 }
274 result
275 }
276
277 fn plan_jobs(&self) -> Result<ExecutionPlan> {
278 let run_manual = env::var("OPAL_RUN_MANUAL").is_ok_and(|v| v == "1");
279 let ctx = RuleContext::from_env(&self.config.workdir, self.shared_env.clone(), run_manual);
280 ctx.ensure_valid_tag_context()?;
281 if !pipeline::rules::filters_allow(&self.pipeline.filters, &ctx) {
282 return Ok(empty_execution_plan());
283 }
284 if let Some(workflow) = &self.pipeline.workflow
285 && !pipeline::rules::evaluate_workflow(&workflow.rules, &ctx)?
286 {
287 return Ok(empty_execution_plan());
288 }
289 let compiled = compile_pipeline(&self.pipeline, Some(&ctx))?;
290 let mut plan = build_execution_plan(compiled, |job| self.job_log_info(job))?;
291 if !self.config.selected_jobs.is_empty() {
292 plan = plan.select_jobs(&self.config.selected_jobs)?;
293 }
294 Ok(plan)
295 }
296
297 fn collect_job_resources(&self, plan: &ExecutionPlan) -> HashMap<String, JobResourceInfo> {
298 plan.nodes
299 .values()
300 .map(|planned| {
301 let artifact_dir = if planned.instance.job.artifacts.paths.is_empty() {
303 None
304 } else {
305 Some(
306 self.artifacts
307 .job_artifacts_root(&planned.instance.job.name)
308 .display()
309 .to_string(),
310 )
311 };
312 let artifact_paths = planned
313 .instance
314 .job
315 .artifacts
316 .paths
317 .iter()
318 .map(|path| path.display().to_string())
319 .collect();
320 let env_vars = self.job_env(&planned.instance.job);
321 let cache_env: HashMap<String, String> = env_vars.iter().cloned().collect();
322 let caches = self
323 .cache
324 .describe_entries(
325 &planned.instance.job.cache,
326 &self.config.workdir,
327 &cache_env,
328 )
329 .into_iter()
330 .map(|entry| HistoryCache {
331 key: entry.key,
332 policy: cache_policy_label(entry.policy).to_string(),
333 host: entry.host.display().to_string(),
334 paths: entry
335 .paths
336 .iter()
337 .map(|path| path.display().to_string())
338 .collect(),
339 })
340 .collect();
341 (
342 planned.instance.job.name.clone(),
343 JobResourceInfo {
344 artifact_dir,
345 artifact_paths,
346 caches,
347 container_name: None,
348 service_network: None,
349 service_containers: Vec::new(),
350 runtime_summary_path: None,
351 },
352 )
353 })
354 .collect()
355 }
356
357 fn convert_ui_resources(
358 resources: &HashMap<String, JobResourceInfo>,
359 ) -> HashMap<String, UiJobResources> {
360 resources
361 .iter()
362 .map(|(name, info)| {
363 (
364 name.clone(),
365 UiJobResources {
366 artifact_dir: info.artifact_dir.clone(),
367 artifact_paths: info.artifact_paths.clone(),
368 caches: info.caches.clone(),
369 container_name: info.container_name.clone(),
370 service_network: info.service_network.clone(),
371 service_containers: info.service_containers.clone(),
372 runtime_summary_path: info.runtime_summary_path.clone(),
373 },
374 )
375 })
376 .collect()
377 }
378
379 fn record_pipeline_history(
380 &self,
381 summaries: &[JobSummary],
382 resources: &HashMap<String, JobResourceInfo>,
383 ) -> Option<HistoryEntry> {
384 let resource_map = resources
385 .iter()
386 .map(|(name, info)| {
387 let runtime = self.runtime_state.runtime_objects(name);
388 (
389 name.clone(),
390 history_store::HistoryResources {
391 artifact_dir: info.artifact_dir.clone(),
392 artifacts: info.artifact_paths.clone(),
393 caches: info.caches.clone(),
394 container_name: runtime
395 .as_ref()
396 .and_then(|objects| objects.container_name.clone())
397 .or_else(|| info.container_name.clone()),
398 service_network: runtime
399 .as_ref()
400 .and_then(|objects| objects.service_network.clone())
401 .or_else(|| info.service_network.clone()),
402 service_containers: runtime
403 .as_ref()
404 .map(|objects| objects.service_containers.clone())
405 .unwrap_or_else(|| info.service_containers.clone()),
406 runtime_summary_path: runtime
407 .as_ref()
408 .and_then(|objects| objects.runtime_summary_path.clone())
409 .or_else(|| info.runtime_summary_path.clone()),
410 },
411 )
412 })
413 .collect();
414 self.history_store
415 .record(&self.run_id, summaries, &resource_map)
416 }
417
418 pub(crate) fn record_runtime_objects(
419 &self,
420 job_name: &str,
421 container_name: String,
422 service_network: Option<String>,
423 service_containers: Vec<String>,
424 runtime_summary_path: Option<String>,
425 ) {
426 self.runtime_state.record_runtime_objects(
427 job_name,
428 container_name,
429 service_network,
430 service_containers,
431 runtime_summary_path,
432 );
433 }
434
435 pub(crate) fn write_runtime_summary(
436 &self,
437 job_name: &str,
438 container_name: &str,
439 service_network: Option<&str>,
440 service_containers: &[String],
441 ) -> Result<Option<String>> {
442 runtime_summary::write_runtime_summary(
443 self,
444 job_name,
445 container_name,
446 service_network,
447 service_containers,
448 )
449 }
450
451 pub(crate) fn log_job_start(
452 &self,
453 planned: &ExecutableJob,
454 ui: Option<&UiBridge>,
455 ) -> Result<JobRunInfo> {
456 launch::log_job_start(self, planned, ui)
457 }
458
459 pub(crate) fn prepare_job_run(
460 &self,
461 plan: &ExecutionPlan,
462 job: &JobSpec,
463 ) -> Result<preparer::PreparedJobRun> {
464 preparer::prepare_job_run(self, plan, job)
465 }
466
467 pub(crate) fn collect_untracked_artifacts(
468 &self,
469 job: &JobSpec,
470 workspace: &Path,
471 ) -> Result<()> {
472 self.artifacts.collect_untracked(job, workspace)
473 }
474
475 pub(crate) fn collect_declared_artifacts(
476 &self,
477 job: &JobSpec,
478 workspace: &Path,
479 mounts: &[crate::pipeline::VolumeMount],
480 ) -> Result<()> {
481 self.artifacts
482 .collect_declared(job, workspace, mounts, &self.container_workdir)
483 }
484
485 pub(crate) fn collect_dotenv_artifacts(
486 &self,
487 job: &JobSpec,
488 workspace: &Path,
489 mounts: &[crate::pipeline::VolumeMount],
490 ) -> Result<()> {
491 self.artifacts
492 .collect_dotenv_report(job, workspace, mounts, &self.container_workdir)
493 }
494
495 pub(crate) fn clear_running_container(&self, job_name: &str) {
496 self.runtime_state.clear_running_container(job_name);
497 }
498
499 pub(crate) fn record_completed_job(&self, job_name: &str, outcome: ArtifactSourceOutcome) {
500 self.runtime_state.record_completed_job(job_name, outcome);
501 }
502
503 pub(crate) fn completed_jobs(&self) -> HashMap<String, ArtifactSourceOutcome> {
504 self.runtime_state.completed_jobs()
505 }
506
507 pub(crate) fn take_cancelled_job(&self, job_name: &str) -> bool {
508 self.runtime_state.take_cancelled_job(job_name)
509 }
510
511 pub(crate) fn cancel_running_job(&self, job_name: &str) -> bool {
512 let container = self.runtime_state.running_container(job_name);
513 if let Some(container_name) = container {
514 self.runtime_state.mark_job_cancelled(job_name);
515 self.kill_container(job_name, &container_name);
516 true
517 } else {
518 false
519 }
520 }
521
522 pub(crate) fn execute(&self, ctx: ExecuteContext<'_>) -> Result<()> {
523 process::execute(self, ctx)
524 }
525
526 pub(crate) fn print_job_completion(
527 &self,
528 stage_name: &str,
529 script_path: &Path,
530 log_path: &Path,
531 elapsed: f32,
532 ) {
533 if !self.config.enable_tui {
534 let display = self.display();
535 display::print_line(format!(" script stored at {}", script_path.display()));
536 display::print_line(format!(" log file stored at {}", log_path.display()));
537 let finish_label = display.bold_green(" ✓ finished in");
538 display::print_line(format!("{} {:.2}s", finish_label, elapsed));
539
540 if let Some(stage_elapsed) = self.stage_tracker.complete_job(stage_name) {
541 let stage_footer = display.bold_blue("╰─ stage complete in");
542 display::print_line(format!("{stage_footer} {:.2}s", stage_elapsed));
543 }
544 }
545 }
546
547 pub(crate) fn kill_container(&self, job_name: &str, container_name: &str) {
548 lifecycle::kill_container(self, job_name, container_name);
549 }
550
551 pub(crate) fn cleanup_finished_container(&self, container_name: &str) {
552 lifecycle::cleanup_finished_container(self, container_name);
553 }
554
555 fn resolve_job_image_with_env(
556 &self,
557 job: &JobSpec,
558 env_lookup: Option<&HashMap<String, String>>,
559 ) -> Result<crate::model::ImageSpec> {
560 launch::resolve_job_image_with_env(self, job, env_lookup)
561 }
562
563 fn job_env(&self, job: &JobSpec) -> Vec<(String, String)> {
564 build_job_env(
565 &self.env_vars,
566 &self.pipeline.defaults.variables,
567 job,
568 &self.secrets,
569 &self.config.workdir,
570 &self.container_workdir,
571 Path::new(CONTAINER_ROOT),
572 &self.run_id,
573 &self.shared_env,
574 )
575 }
576
577 pub(crate) fn expanded_environment(
578 &self,
579 job: &JobSpec,
580 ) -> Option<crate::model::EnvironmentSpec> {
581 let environment = job.environment.as_ref()?;
582 let lookup: HashMap<String, String> = self.job_env(job).into_iter().collect();
583 Some(crate::env::expand_environment(environment, &lookup))
584 }
585
586 fn display(&self) -> DisplayFormatter {
587 DisplayFormatter::new(self.use_color)
588 }
589
590 pub(crate) fn analyze_job_with_default_provider(
591 &self,
592 plan: &ExecutionPlan,
593 job_name: &str,
594 source_name: &str,
595 ui: Option<&UiBridge>,
596 ) {
597 let provider = self
598 .config
599 .settings
600 .ai_settings()
601 .default_provider
602 .unwrap_or(crate::config::AiProviderConfig::Ollama);
603 let provider_kind = match provider {
604 crate::config::AiProviderConfig::Ollama => AiProviderKind::Ollama,
605 crate::config::AiProviderConfig::Claude => AiProviderKind::Claude,
606 crate::config::AiProviderConfig::Codex => AiProviderKind::Codex,
607 };
608 let provider_label = match provider_kind {
609 AiProviderKind::Ollama => "ollama",
610 AiProviderKind::Claude => "claude",
611 AiProviderKind::Codex => "codex",
612 };
613 if let Some(ui) = ui {
614 ui.analysis_started(job_name, provider_label);
615 }
616
617 let outcome = (|| -> Result<Option<PathBuf>> {
618 if provider_kind == AiProviderKind::Ollama
619 && self
620 .config
621 .settings
622 .ai_settings()
623 .ollama
624 .model
625 .trim()
626 .is_empty()
627 {
628 anyhow::bail!(
629 "Ollama analysis requires [ai.ollama].model in config; Opal does not choose a default model for you"
630 );
631 }
632 let rendered = self.render_ai_prompt_parts(plan, job_name, source_name)?;
633 let prompt = self.secrets.mask_fragment(&rendered.prompt);
634 let system = rendered
635 .system
636 .as_deref()
637 .map(|text| self.secrets.mask_fragment(text).into_owned());
638
639 let save_path = self.config.settings.ai_settings().save_analysis.then(|| {
640 self.session_dir
641 .join(job_name_slug(job_name))
642 .join("analysis")
643 .join(format!("{provider_label}.md"))
644 });
645
646 let request = AiRequest {
647 provider: provider_kind,
648 prompt: prompt.into_owned(),
649 system,
650 host: (provider_kind == AiProviderKind::Ollama)
651 .then(|| self.config.settings.ai_settings().ollama.host.clone()),
652 model: match provider_kind {
653 AiProviderKind::Ollama => {
654 Some(self.config.settings.ai_settings().ollama.model.clone())
655 }
656 AiProviderKind::Codex => self.config.settings.ai_settings().codex.model.clone(),
657 AiProviderKind::Claude => None,
658 },
659 command: (provider_kind == AiProviderKind::Codex)
660 .then(|| self.config.settings.ai_settings().codex.command.clone()),
661 args: Vec::new(),
662 workdir: (provider_kind == AiProviderKind::Codex)
663 .then(|| self.config.workdir.clone()),
664 save_path: save_path.clone(),
665 };
666
667 let result = ai::analyze_with_default_provider(&request, |chunk| {
668 if let (Some(ui), ai::AiChunk::Text(text)) = (ui, chunk) {
669 ui.analysis_chunk(job_name, &text);
670 }
671 })
672 .map_err(|err| anyhow::anyhow!(err.message))?;
673
674 if let Some(path) = &save_path {
675 if let Some(parent) = path.parent() {
676 fs::create_dir_all(parent)?;
677 }
678 fs::write(path, result.text)?;
679 }
680
681 Ok(save_path)
682 })();
683
684 if let Some(ui) = ui {
685 match outcome {
686 Ok(saved_path) => ui.analysis_finished(
687 job_name,
688 if let Some(path) = &saved_path {
689 fs::read_to_string(path).unwrap_or_default()
690 } else {
691 String::new()
692 },
693 saved_path,
694 None,
695 ),
696 Err(err) => {
697 ui.analysis_finished(job_name, String::new(), None, Some(err.to_string()))
698 }
699 }
700 }
701 }
702
703 pub(crate) fn render_ai_prompt(
704 &self,
705 plan: &ExecutionPlan,
706 job_name: &str,
707 source_name: &str,
708 ) -> Result<String> {
709 let rendered = self.render_ai_prompt_parts(plan, job_name, source_name)?;
710 let mut text = String::new();
711 if let Some(system) = rendered.system {
712 text.push_str("# System\n\n");
713 text.push_str(system.trim());
714 text.push_str("\n\n");
715 }
716 text.push_str("# Prompt\n\n");
717 text.push_str(rendered.prompt.trim());
718 text.push('\n');
719 Ok(text)
720 }
721
722 fn render_ai_prompt_parts(
723 &self,
724 plan: &ExecutionPlan,
725 job_name: &str,
726 source_name: &str,
727 ) -> Result<crate::ai::RenderedPrompt> {
728 let context = self.build_ai_context(plan, job_name, source_name)?;
729 render_job_analysis_prompt(
730 &self.config.workdir,
731 self.config.settings.ai_settings(),
732 &context,
733 )
734 }
735
736 fn build_ai_context(
737 &self,
738 plan: &ExecutionPlan,
739 job_name: &str,
740 source_name: &str,
741 ) -> Result<AiContext> {
742 let planned = plan
743 .nodes
744 .get(job_name)
745 .with_context(|| format!("selected job '{job_name}' not found in execution plan"))?;
746 let runner = self.ui_runner_info_for_job(&planned.instance.job);
747 let runner_summary = format!(
748 "engine={} arch={} vcpu={} ram={}",
749 runner.engine,
750 runner.arch.unwrap_or_else(|| "native/default".to_string()),
751 runner.cpus.unwrap_or_else(|| "engine default".to_string()),
752 runner
753 .memory
754 .unwrap_or_else(|| "engine default".to_string())
755 );
756 let job_yaml = self.load_job_yaml_fragment(source_name)?;
757 let pipeline_summary = format!(
758 "dependencies: {}\nneeds: {}\nallow_failure: {}\ninterruptible: {}",
759 if planned.instance.dependencies.is_empty() {
760 "none".to_string()
761 } else {
762 planned.instance.dependencies.join(", ")
763 },
764 if planned.instance.job.needs.is_empty() {
765 "none".to_string()
766 } else {
767 planned
768 .instance
769 .job
770 .needs
771 .iter()
772 .map(|need| {
773 if need.needs_artifacts {
774 format!("{} (artifacts)", need.job)
775 } else {
776 need.job.clone()
777 }
778 })
779 .collect::<Vec<_>>()
780 .join(", ")
781 },
782 planned.instance.rule.allow_failure,
783 planned.instance.interruptible,
784 );
785 let runtime_summary = self
786 .runtime_state
787 .runtime_objects(job_name)
788 .and_then(|objects| objects.runtime_summary_path)
789 .and_then(|path| fs::read_to_string(path).ok());
790 let log_excerpt = self.read_job_log_excerpt(&planned.log_path)?;
791
792 Ok(AiContext {
793 job_name: job_name.to_string(),
794 source_name: source_name.to_string(),
795 stage: planned.instance.stage_name.clone(),
796 job_yaml,
797 runner_summary,
798 pipeline_summary,
799 runtime_summary,
800 log_excerpt,
801 failure_hint: None,
802 })
803 }
804
805 fn load_job_yaml_fragment(&self, source_name: &str) -> Result<String> {
806 let content = fs::read_to_string(&self.config.pipeline)
807 .with_context(|| format!("failed to read {}", self.config.pipeline.display()))?;
808 let yaml: serde_yaml::Value = serde_yaml::from_str(&content)
809 .with_context(|| format!("failed to parse {}", self.config.pipeline.display()))?;
810 let Some(mapping) = yaml.as_mapping() else {
811 return Ok(format!("# job '{source_name}' not found"));
812 };
813 for (key, value) in mapping {
814 if key.as_str() == Some(source_name) {
815 let mut root = serde_yaml::Mapping::new();
816 root.insert(
817 serde_yaml::Value::String(source_name.to_string()),
818 value.clone(),
819 );
820 return Ok(serde_yaml::to_string(&serde_yaml::Value::Mapping(root))?);
821 }
822 }
823 Ok(format!("# job '{source_name}' not found"))
824 }
825
826 fn read_job_log_excerpt(&self, path: &Path) -> Result<String> {
827 let content = fs::read_to_string(path)
828 .with_context(|| format!("failed to read log {}", path.display()))?;
829 let tail_lines = self.config.settings.ai_settings().tail_lines.max(50);
830 let lines: Vec<&str> = content.lines().collect();
831 let start = lines.len().saturating_sub(tail_lines);
832 Ok(lines[start..].join("\n"))
833 }
834
835 fn ui_runner_info_for_job(&self, job: &JobSpec) -> crate::ui::types::UiRunnerInfo {
836 let engine = match self.config.engine {
837 EngineKind::ContainerCli => "container",
838 EngineKind::Docker => "docker",
839 EngineKind::Podman => "podman",
840 EngineKind::Nerdctl => "nerdctl",
841 EngineKind::Orbstack => "orbstack",
842 }
843 .to_string();
844
845 let job_override = self.config.settings.job_override_for(&job.name);
846 let image_platform = job
847 .image
848 .as_ref()
849 .and_then(|image| image.docker_platform.as_deref());
850 let arch = match self.config.engine {
851 EngineKind::ContainerCli => job_override
852 .as_ref()
853 .and_then(|cfg| cfg.arch.clone())
854 .or_else(|| {
855 self.config
856 .settings
857 .container_settings()
858 .and_then(|cfg| cfg.arch.clone())
859 })
860 .or_else(|| std::env::var("OPAL_CONTAINER_ARCH").ok())
861 .or_else(|| image_platform.and_then(container_arch_from_platform))
862 .or_else(|| normalize_container_arch(std::env::consts::ARCH)),
863 _ => image_platform
864 .and_then(container_arch_from_platform)
865 .or_else(|| job_override.as_ref().and_then(|cfg| cfg.arch.clone()))
866 .or_else(|| normalize_container_arch(std::env::consts::ARCH)),
867 };
868
869 let (cpus, memory) = match self.config.engine {
870 EngineKind::ContainerCli => {
871 let settings = self.config.settings.container_settings();
872 (
873 Some(
874 settings
875 .and_then(|cfg| cfg.cpus.clone())
876 .unwrap_or_else(|| "4".to_string()),
877 ),
878 Some(
879 settings
880 .and_then(|cfg| cfg.memory.clone())
881 .unwrap_or_else(|| "1638m".to_string()),
882 ),
883 )
884 }
885 _ => (None, None),
886 };
887
888 crate::ui::types::UiRunnerInfo {
889 engine,
890 arch,
891 cpus,
892 memory,
893 }
894 }
895
896 fn job_log_info(&self, job: &JobSpec) -> (PathBuf, String) {
897 logging::job_log_info(&self.logs_dir, &self.run_id, job)
898 }
899
900 fn container_path_rel(&self, host_path: &Path) -> Result<PathBuf> {
901 paths::to_container_path(
902 host_path,
903 &[
904 (&*self.session_dir, &*self.container_session_dir),
905 (&*self.config.workdir, &*self.container_workdir),
906 ],
907 )
908 }
909}
910
911fn empty_execution_plan() -> ExecutionPlan {
912 ExecutionPlan {
913 ordered: Vec::new(),
914 nodes: HashMap::new(),
915 dependents: HashMap::new(),
916 order_index: HashMap::new(),
917 variants: HashMap::new(),
918 }
919}
920
921fn cache_policy_label(policy: CachePolicySpec) -> &'static str {
922 match policy {
923 CachePolicySpec::Pull => "pull",
924 CachePolicySpec::Push => "push",
925 CachePolicySpec::PullPush => "pull-push",
926 }
927}
928
929#[cfg(test)]
930mod tests {
931 }