1mod history_store;
2mod launch;
3mod lifecycle;
4mod preparer;
5mod process;
6mod registry;
7mod runtime_state;
8mod runtime_summary;
9mod stage_tracker;
10mod workspace;
11
12use super::{orchestrator, paths};
13use crate::compiler::compile_pipeline;
14use crate::display::{self, DisplayFormatter, collect_pipeline_plan, print_pipeline_summary};
15use crate::env::{build_job_env, collect_env_vars, expand_env_list};
16use crate::execution_plan::{ExecutableJob, ExecutionPlan, build_execution_plan};
17use crate::history::{HistoryCache, HistoryEntry};
18use crate::logging;
19use crate::model::{ArtifactSourceOutcome, CachePolicySpec, JobSpec, PipelineSpec};
20use crate::naming::generate_run_id;
21use crate::pipeline::{
22 self, ArtifactManager, CacheManager, ExternalArtifactsManager, JobRunInfo, JobSummary,
23 RuleContext,
24};
25use crate::runner::ExecuteContext;
26use crate::secrets::SecretsStore;
27use crate::terminal::should_use_color;
28use crate::ui::{UiBridge, UiHandle, UiJobInfo, UiJobResources};
29use crate::{ExecutorConfig, runtime};
30use anyhow::{Context, Result};
31use std::collections::HashMap;
32use std::env;
33use std::fs;
34use std::path::{Path, PathBuf};
35use std::sync::Arc;
36use tokio::sync::mpsc;
37
38pub(super) const CONTAINER_ROOT: &str = "/builds";
39
40#[derive(Debug, Clone)]
41pub struct ExecutorCore {
42 pub config: ExecutorConfig,
43 pipeline: PipelineSpec,
44 use_color: bool,
45 scripts_dir: PathBuf,
46 logs_dir: PathBuf,
47 session_dir: PathBuf,
48 container_session_dir: PathBuf,
49 run_id: String,
50 verbose_scripts: bool,
51 env_vars: Vec<(String, String)>,
52 shared_env: HashMap<String, String>,
53 container_workdir: PathBuf,
54 stage_tracker: stage_tracker::StageTracker,
55 runtime_state: runtime_state::RuntimeState,
56 history_store: history_store::HistoryStore,
57 secrets: SecretsStore,
58 artifacts: ArtifactManager,
59 cache: CacheManager,
60 external_artifacts: Option<ExternalArtifactsManager>,
61}
62
63#[derive(Debug, Clone)]
64struct JobResourceInfo {
65 artifact_dir: Option<String>,
66 artifact_paths: Vec<String>,
67 caches: Vec<HistoryCache>,
68 container_name: Option<String>,
69 service_network: Option<String>,
70 service_containers: Vec<String>,
71 runtime_summary_path: Option<String>,
72}
73
74impl ExecutorCore {
75 pub fn new(config: ExecutorConfig) -> Result<Self> {
77 let pipeline =
78 PipelineSpec::from_path_with_gitlab(&config.pipeline, config.gitlab.as_ref())?;
79 let run_id = generate_run_id(&config);
80 let runs_root = runtime::runs_root();
81 fs::create_dir_all(&runs_root)
82 .with_context(|| format!("failed to create {:?}", runs_root))?;
83
84 let session_dir = runtime::session_dir(&run_id);
85 if session_dir.exists() {
86 fs::remove_dir_all(&session_dir)
87 .with_context(|| format!("failed to clean {:?}", session_dir))?;
88 }
89 fs::create_dir_all(&session_dir)
90 .with_context(|| format!("failed to create {:?}", session_dir))?;
91
92 let scripts_dir = session_dir.join("scripts");
93 fs::create_dir_all(&scripts_dir)
94 .with_context(|| format!("failed to create {:?}", scripts_dir))?;
95
96 let logs_dir = runtime::logs_dir(&run_id);
97 fs::create_dir_all(&logs_dir)
98 .with_context(|| format!("failed to create {:?}", logs_dir))?;
99
100 let history_store = history_store::HistoryStore::load(runtime::history_path());
101
102 let use_color = should_use_color();
103 let env_verbose = env::var_os("OPAL_DEBUG")
104 .map(|val| {
105 let s = val.to_string_lossy();
106 s == "1" || s.eq_ignore_ascii_case("true")
107 })
108 .unwrap_or(false);
109 let verbose_scripts = config.trace_scripts || env_verbose;
110 let mut env_vars = collect_env_vars(&config.env_includes)?;
111 let mut shared_env: HashMap<String, String> = env::vars().collect();
112 expand_env_list(&mut env_vars[..], &shared_env);
113 shared_env.extend(env_vars.iter().cloned());
114 let stage_specs: Vec<(String, usize)> = pipeline
115 .stages
116 .iter()
117 .map(|stage| (stage.name.clone(), stage.jobs.len()))
118 .collect();
119 let stage_tracker = stage_tracker::StageTracker::new(&stage_specs);
120
121 let secrets = SecretsStore::load(&config.workdir)?;
122 shared_env.extend(secrets.env_pairs());
123 let artifacts = ArtifactManager::new(session_dir.clone());
124 let cache_root = runtime::cache_root();
125 fs::create_dir_all(&cache_root)
126 .with_context(|| format!("failed to create cache root {:?}", cache_root))?;
127 let cache = CacheManager::new(cache_root);
128 let external_artifacts = config.gitlab.as_ref().map(|cfg| {
129 ExternalArtifactsManager::new(
130 session_dir.clone(),
131 cfg.base_url.clone(),
132 cfg.token.clone(),
133 )
134 });
135 let project_dir = config
136 .workdir
137 .file_name()
138 .and_then(|n| n.to_str())
139 .unwrap_or("project");
140 let container_workdir = Path::new(CONTAINER_ROOT).join(project_dir);
141 let container_session_dir = Path::new("/opal").join(&run_id);
142
143 let core = Self {
144 config,
145 pipeline,
146 use_color,
147 scripts_dir,
148 logs_dir,
149 session_dir,
150 container_session_dir,
151 run_id,
152 verbose_scripts,
153 env_vars,
154 shared_env,
155 container_workdir,
156 stage_tracker,
157 runtime_state: runtime_state::RuntimeState::default(),
158 history_store,
159 secrets,
160 artifacts,
161 cache,
162 external_artifacts,
163 };
164
165 registry::ensure_registry_logins(&core)?;
166
167 Ok(core)
168 }
169
170 pub async fn run(&self) -> Result<()> {
171 let plan = Arc::new(self.plan_jobs()?);
172 let resource_map = self.collect_job_resources(&plan);
173 let display = self.display();
174 let plan_text = collect_pipeline_plan(&display, &plan).join("\n");
175 let ui_resources = Self::convert_ui_resources(&resource_map);
176 let history_snapshot = self.history_store.snapshot();
177 let ui_handle = if self.config.enable_tui {
178 let jobs: Vec<UiJobInfo> = plan
179 .ordered
180 .iter()
181 .filter_map(|name| plan.nodes.get(name))
182 .map(|planned| UiJobInfo {
183 name: planned.instance.job.name.clone(),
184 stage: planned.instance.stage_name.clone(),
185 log_path: planned.log_path.clone(),
186 log_hash: planned.log_hash.clone(),
187 })
188 .collect();
189 Some(UiHandle::start(
190 jobs,
191 history_snapshot,
192 self.run_id.clone(),
193 ui_resources,
194 plan_text,
195 self.config.workdir.clone(),
196 )?)
197 } else {
198 None
199 };
200 let mut owned_command_rx = if !self.config.enable_tui {
201 let (tx, rx) = mpsc::unbounded_channel();
202 if let Ok(raw) = env::var("OPAL_ABORT_AFTER_SECS")
203 && let Ok(seconds) = raw.parse::<u64>()
204 && seconds > 0
205 {
206 tokio::spawn(async move {
207 tokio::time::sleep(std::time::Duration::from_secs(seconds)).await;
208 let _ = tx.send(crate::ui::UiCommand::AbortPipeline);
209 });
210 }
211 Some(rx)
212 } else {
213 None
214 };
215 let mut ui_command_rx = ui_handle
216 .as_ref()
217 .and_then(|handle| handle.command_receiver());
218 let command_rx = ui_command_rx.as_mut().or(owned_command_rx.as_mut());
219 let ui_bridge = ui_handle.as_ref().map(|handle| Arc::new(handle.bridge()));
220
221 let (mut summaries, result) =
222 orchestrator::execute_plan(self, plan.clone(), ui_bridge.clone(), command_rx).await;
223
224 if let Some(handle) = &ui_handle {
225 handle.pipeline_finished();
226 }
227
228 if let Some(commands) = ui_command_rx.as_mut() {
229 orchestrator::handle_restart_commands(
230 self,
231 plan.clone(),
232 ui_bridge.clone(),
233 commands,
234 &mut summaries,
235 )
236 .await?;
237 }
238
239 let history_entry = self.record_pipeline_history(&summaries, &resource_map);
240 if let (Some(entry), Some(ui)) = (history_entry, ui_bridge.as_deref()) {
241 ui.history_updated(entry);
242 }
243
244 if let Some(handle) = ui_handle {
245 handle.wait_for_exit();
246 }
247
248 if !self.config.enable_tui {
249 print_pipeline_summary(
250 &display,
251 &plan,
252 &summaries,
253 &self.session_dir,
254 display::print_line,
255 );
256 }
257 result
258 }
259
260 fn plan_jobs(&self) -> Result<ExecutionPlan> {
261 let ctx = RuleContext::new(&self.config.workdir);
262 ctx.ensure_valid_tag_context()?;
263 if !pipeline::rules::filters_allow(&self.pipeline.filters, &ctx) {
264 return Ok(empty_execution_plan());
265 }
266 if let Some(workflow) = &self.pipeline.workflow
267 && !pipeline::rules::evaluate_workflow(&workflow.rules, &ctx)?
268 {
269 return Ok(empty_execution_plan());
270 }
271 let compiled = compile_pipeline(&self.pipeline, Some(&ctx))?;
272 let mut plan = build_execution_plan(compiled, |job| self.job_log_info(job))?;
273 if !self.config.selected_jobs.is_empty() {
274 plan = plan.select_jobs(&self.config.selected_jobs)?;
275 }
276 Ok(plan)
277 }
278
279 fn collect_job_resources(&self, plan: &ExecutionPlan) -> HashMap<String, JobResourceInfo> {
280 plan.nodes
281 .values()
282 .map(|planned| {
283 let artifact_dir = if planned.instance.job.artifacts.paths.is_empty() {
285 None
286 } else {
287 Some(
288 self.artifacts
289 .job_artifacts_root(&planned.instance.job.name)
290 .display()
291 .to_string(),
292 )
293 };
294 let artifact_paths = planned
295 .instance
296 .job
297 .artifacts
298 .paths
299 .iter()
300 .map(|path| path.display().to_string())
301 .collect();
302 let env_vars = self.job_env(&planned.instance.job);
303 let cache_env: HashMap<String, String> = env_vars.iter().cloned().collect();
304 let caches = self
305 .cache
306 .describe_entries(
307 &planned.instance.job.cache,
308 &self.config.workdir,
309 &cache_env,
310 )
311 .into_iter()
312 .map(|entry| HistoryCache {
313 key: entry.key,
314 policy: cache_policy_label(entry.policy).to_string(),
315 host: entry.host.display().to_string(),
316 paths: entry
317 .paths
318 .iter()
319 .map(|path| path.display().to_string())
320 .collect(),
321 })
322 .collect();
323 (
324 planned.instance.job.name.clone(),
325 JobResourceInfo {
326 artifact_dir,
327 artifact_paths,
328 caches,
329 container_name: None,
330 service_network: None,
331 service_containers: Vec::new(),
332 runtime_summary_path: None,
333 },
334 )
335 })
336 .collect()
337 }
338
339 fn convert_ui_resources(
340 resources: &HashMap<String, JobResourceInfo>,
341 ) -> HashMap<String, UiJobResources> {
342 resources
343 .iter()
344 .map(|(name, info)| {
345 (
346 name.clone(),
347 UiJobResources {
348 artifact_dir: info.artifact_dir.clone(),
349 artifact_paths: info.artifact_paths.clone(),
350 caches: info.caches.clone(),
351 container_name: info.container_name.clone(),
352 service_network: info.service_network.clone(),
353 service_containers: info.service_containers.clone(),
354 runtime_summary_path: info.runtime_summary_path.clone(),
355 },
356 )
357 })
358 .collect()
359 }
360
361 fn record_pipeline_history(
362 &self,
363 summaries: &[JobSummary],
364 resources: &HashMap<String, JobResourceInfo>,
365 ) -> Option<HistoryEntry> {
366 let resource_map = resources
367 .iter()
368 .map(|(name, info)| {
369 let runtime = self.runtime_state.runtime_objects(name);
370 (
371 name.clone(),
372 history_store::HistoryResources {
373 artifact_dir: info.artifact_dir.clone(),
374 artifacts: info.artifact_paths.clone(),
375 caches: info.caches.clone(),
376 container_name: runtime
377 .as_ref()
378 .and_then(|objects| objects.container_name.clone())
379 .or_else(|| info.container_name.clone()),
380 service_network: runtime
381 .as_ref()
382 .and_then(|objects| objects.service_network.clone())
383 .or_else(|| info.service_network.clone()),
384 service_containers: runtime
385 .as_ref()
386 .map(|objects| objects.service_containers.clone())
387 .unwrap_or_else(|| info.service_containers.clone()),
388 runtime_summary_path: runtime
389 .as_ref()
390 .and_then(|objects| objects.runtime_summary_path.clone())
391 .or_else(|| info.runtime_summary_path.clone()),
392 },
393 )
394 })
395 .collect();
396 self.history_store
397 .record(&self.run_id, summaries, &resource_map)
398 }
399
400 pub(crate) fn record_runtime_objects(
401 &self,
402 job_name: &str,
403 container_name: String,
404 service_network: Option<String>,
405 service_containers: Vec<String>,
406 runtime_summary_path: Option<String>,
407 ) {
408 self.runtime_state.record_runtime_objects(
409 job_name,
410 container_name,
411 service_network,
412 service_containers,
413 runtime_summary_path,
414 );
415 }
416
417 pub(crate) fn write_runtime_summary(
418 &self,
419 job_name: &str,
420 container_name: &str,
421 service_network: Option<&str>,
422 service_containers: &[String],
423 ) -> Result<Option<String>> {
424 runtime_summary::write_runtime_summary(
425 self,
426 job_name,
427 container_name,
428 service_network,
429 service_containers,
430 )
431 }
432
433 pub(crate) fn log_job_start(
434 &self,
435 planned: &ExecutableJob,
436 ui: Option<&UiBridge>,
437 ) -> Result<JobRunInfo> {
438 launch::log_job_start(self, planned, ui)
439 }
440
441 pub(crate) fn prepare_job_run(
442 &self,
443 plan: &ExecutionPlan,
444 job: &JobSpec,
445 ) -> Result<preparer::PreparedJobRun> {
446 preparer::prepare_job_run(self, plan, job)
447 }
448
449 pub(crate) fn collect_untracked_artifacts(
450 &self,
451 job: &JobSpec,
452 workspace: &Path,
453 ) -> Result<()> {
454 self.artifacts.collect_untracked(job, workspace)
455 }
456
457 pub(crate) fn collect_declared_artifacts(&self, job: &JobSpec, workspace: &Path) -> Result<()> {
458 self.artifacts.collect_declared(job, workspace)
459 }
460
461 pub(crate) fn collect_dotenv_artifacts(&self, job: &JobSpec, workspace: &Path) -> Result<()> {
462 self.artifacts.collect_dotenv_report(job, workspace)
463 }
464
465 pub(crate) fn clear_running_container(&self, job_name: &str) {
466 self.runtime_state.clear_running_container(job_name);
467 }
468
469 pub(crate) fn record_completed_job(&self, job_name: &str, outcome: ArtifactSourceOutcome) {
470 self.runtime_state.record_completed_job(job_name, outcome);
471 }
472
473 pub(crate) fn completed_jobs(&self) -> HashMap<String, ArtifactSourceOutcome> {
474 self.runtime_state.completed_jobs()
475 }
476
477 pub(crate) fn take_cancelled_job(&self, job_name: &str) -> bool {
478 self.runtime_state.take_cancelled_job(job_name)
479 }
480
481 pub(crate) fn cancel_running_job(&self, job_name: &str) -> bool {
482 let container = self.runtime_state.running_container(job_name);
483 if let Some(container_name) = container {
484 self.runtime_state.mark_job_cancelled(job_name);
485 self.kill_container(job_name, &container_name);
486 true
487 } else {
488 false
489 }
490 }
491
492 pub(crate) fn execute(&self, ctx: ExecuteContext<'_>) -> Result<()> {
493 process::execute(self, ctx)
494 }
495
496 pub(crate) fn print_job_completion(
497 &self,
498 stage_name: &str,
499 script_path: &Path,
500 log_path: &Path,
501 elapsed: f32,
502 ) {
503 if !self.config.enable_tui {
504 let display = self.display();
505 display::print_line(format!(" script stored at {}", script_path.display()));
506 display::print_line(format!(" log file stored at {}", log_path.display()));
507 let finish_label = display.bold_green(" ✓ finished in");
508 display::print_line(format!("{} {:.2}s", finish_label, elapsed));
509
510 if let Some(stage_elapsed) = self.stage_tracker.complete_job(stage_name) {
511 let stage_footer = display.bold_blue("╰─ stage complete in");
512 display::print_line(format!("{stage_footer} {:.2}s", stage_elapsed));
513 }
514 }
515 }
516
517 pub(crate) fn kill_container(&self, job_name: &str, container_name: &str) {
518 lifecycle::kill_container(self, job_name, container_name);
519 }
520
521 pub(crate) fn cleanup_finished_container(&self, container_name: &str) {
522 lifecycle::cleanup_finished_container(self, container_name);
523 }
524
525 fn resolve_job_image_with_env(
526 &self,
527 job: &JobSpec,
528 env_lookup: Option<&HashMap<String, String>>,
529 ) -> Result<crate::model::ImageSpec> {
530 launch::resolve_job_image_with_env(self, job, env_lookup)
531 }
532
533 fn job_env(&self, job: &JobSpec) -> Vec<(String, String)> {
534 build_job_env(
535 &self.env_vars,
536 &self.pipeline.defaults.variables,
537 job,
538 &self.secrets,
539 &self.config.workdir,
540 &self.container_workdir,
541 Path::new(CONTAINER_ROOT),
542 &self.run_id,
543 &self.shared_env,
544 )
545 }
546
547 pub(crate) fn expanded_environment(
548 &self,
549 job: &JobSpec,
550 ) -> Option<crate::model::EnvironmentSpec> {
551 let environment = job.environment.as_ref()?;
552 let lookup: HashMap<String, String> = self.job_env(job).into_iter().collect();
553 Some(crate::env::expand_environment(environment, &lookup))
554 }
555
556 fn display(&self) -> DisplayFormatter {
557 DisplayFormatter::new(self.use_color)
558 }
559
560 fn job_log_info(&self, job: &JobSpec) -> (PathBuf, String) {
561 logging::job_log_info(&self.logs_dir, &self.run_id, job)
562 }
563
564 fn container_path_rel(&self, host_path: &Path) -> Result<PathBuf> {
565 paths::to_container_path(
566 host_path,
567 &[
568 (&*self.session_dir, &*self.container_session_dir),
569 (&*self.config.workdir, &*self.container_workdir),
570 ],
571 )
572 }
573}
574
575fn empty_execution_plan() -> ExecutionPlan {
576 ExecutionPlan {
577 ordered: Vec::new(),
578 nodes: HashMap::new(),
579 dependents: HashMap::new(),
580 order_index: HashMap::new(),
581 variants: HashMap::new(),
582 }
583}
584
585fn cache_policy_label(policy: CachePolicySpec) -> &'static str {
586 match policy {
587 CachePolicySpec::Pull => "pull",
588 CachePolicySpec::Push => "push",
589 CachePolicySpec::PullPush => "pull-push",
590 }
591}
592
593#[cfg(test)]
594mod tests {
595 }