1use async_trait::async_trait;
7use cuenv_core::config::BackendConfig;
8use cuenv_core::environment::Environment;
9use cuenv_core::tasks::{Task, TaskBackend, TaskResult};
10use cuenv_core::{Error, Result};
11use dagger_sdk::{Config, ContainerId, connect_opts};
12use std::collections::HashMap;
13use std::path::Path;
14use std::sync::{Arc, Mutex};
15
16type DaggerReport = Box<dyn std::error::Error + Send + Sync + 'static>;
17
18pub struct DaggerBackend {
20 default_image: Option<String>,
21 project_root: std::path::PathBuf,
22 container_cache: Arc<Mutex<HashMap<String, ContainerId>>>,
23}
24
25impl DaggerBackend {
26 pub fn new(default_image: Option<String>, project_root: std::path::PathBuf) -> Self {
27 Self {
28 default_image,
29 project_root,
30 container_cache: Arc::new(Mutex::new(HashMap::new())),
31 }
32 }
33
34 pub fn container_cache(&self) -> &Arc<Mutex<HashMap<String, ContainerId>>> {
36 &self.container_cache
37 }
38}
39
40#[async_trait]
41impl TaskBackend for DaggerBackend {
42 async fn execute(
43 &self,
44 name: &str,
45 task: &Task,
46 env: &Environment,
47 _project_root: &Path,
48 capture_output: bool,
49 ) -> Result<TaskResult> {
50 let dagger_config = task.dagger.as_ref();
51
52 let from_task = dagger_config.and_then(|d| d.from.clone());
54 let image = dagger_config
55 .and_then(|d| d.image.clone())
56 .or_else(|| self.default_image.clone());
57
58 if from_task.is_none() && image.is_none() {
60 return Err(Error::configuration(
61 "Dagger backend requires either 'image' or 'from' (task reference). \
62 Set tasks.<name>.dagger.image, tasks.<name>.dagger.from, or config.backend.options.image"
63 .to_string(),
64 ));
65 }
66
67 let command: Vec<String> = std::iter::once(task.command.clone())
68 .chain(task.args.clone())
69 .collect();
70
71 if command.is_empty() {
72 return Err(Error::configuration(
73 "Dagger task requires a command to execute".to_string(),
74 ));
75 }
76
77 let mut resolved_secrets: Vec<(String, Option<String>, Option<String>, String)> =
79 Vec::new();
80 if let Some(secrets) = dagger_config.and_then(|d| d.secrets.as_ref()) {
81 for secret in secrets {
82 let plaintext = secret.resolver.resolve().await?;
83 resolved_secrets.push((
84 secret.name.clone(),
85 secret.path.clone(),
86 secret.env_var.clone(),
87 plaintext,
88 ));
89 }
90 }
91
92 let cache_mounts: Vec<(String, String)> = dagger_config
94 .and_then(|d| d.cache.as_ref())
95 .map(|caches| {
96 caches
97 .iter()
98 .map(|c| (c.path.clone(), c.name.clone()))
99 .collect()
100 })
101 .unwrap_or_default();
102
103 let cached_container_id = if let Some(ref from_name) = from_task {
105 let cache = self.container_cache.lock().map_err(|_| {
106 Error::configuration("Failed to acquire container cache lock".to_string())
107 })?;
108 cache.get(from_name).cloned()
109 } else {
110 None
111 };
112
113 if from_task.is_some() && cached_container_id.is_none() {
115 return Err(Error::configuration(format!(
116 "Task '{}' references container from task '{}', but no container was found. \
117 Ensure the referenced task runs first (use dependsOn).",
118 name,
119 from_task.as_ref().unwrap()
120 )));
121 }
122
123 let env_map = env.vars.clone();
124 let project_root = self.project_root.clone();
125 let task_name = name.to_string();
126 let task_name_for_cache = task_name.clone();
127 let container_cache = self.container_cache.clone();
128
129 type ResultType = (i32, String, String, Option<ContainerId>);
131 let result_store: Arc<Mutex<Option<std::result::Result<ResultType, DaggerReport>>>> =
132 Arc::new(Mutex::new(None));
133 let result_store_clone = result_store.clone();
134
135 let cfg = Config::default();
136
137 connect_opts(cfg, move |client| {
138 let project_root = project_root.clone();
139 let image = image.clone();
140 let command = command.clone();
141 let env_map = env_map.clone();
142 let result_store = result_store_clone.clone();
143 let resolved_secrets = resolved_secrets.clone();
144 let cache_mounts = cache_mounts.clone();
145 let cached_container_id = cached_container_id.clone();
146 let task_name_inner = task_name.clone();
147
148 async move {
149 let host_dir = client
150 .host()
151 .directory(project_root.to_string_lossy().to_string());
152
153 let mut container = if let Some(container_id) = cached_container_id {
157 client
160 .load_container_from_id(container_id)
161 .with_workdir("/workspace")
162 } else if let Some(img) = image {
163 client
165 .container()
166 .from(img)
167 .with_mounted_directory("/workspace", host_dir)
168 .with_workdir("/workspace")
169 } else {
170 if let Ok(mut guard) = result_store.lock() {
172 *guard = Some(Err("No image or container reference provided".into()));
173 }
174 return Ok(());
175 };
176
177 for (path, cache_name) in &cache_mounts {
179 let cache_vol = client.cache_volume(cache_name);
180 container = container.with_mounted_cache(path, cache_vol);
181 }
182
183 for (secret_name, path, env_var, plaintext) in &resolved_secrets {
185 let dagger_secret = client.set_secret(secret_name, plaintext);
186
187 if let Some(file_path) = path {
188 container = container.with_mounted_secret(file_path, dagger_secret.clone());
189 }
190 if let Some(var_name) = env_var {
191 container = container.with_secret_variable(var_name, dagger_secret);
192 }
193 }
194
195 for (k, v) in env_map {
197 container = container.with_env_variable(k, v);
198 }
199
200 let exec = container.with_exec(command);
202
203 let stdout_res = exec.stdout().await;
205 let stderr_res = exec.stderr().await;
206 let exit_code_res = exec.exit_code().await;
207 let container_id_res = exec.id().await;
208
209 let res = match (stdout_res, stderr_res, exit_code_res, container_id_res) {
210 (Ok(stdout), Ok(stderr), Ok(exit_code), Ok(container_id)) => {
211 Ok((exit_code as i32, stdout, stderr, Some(container_id)))
212 }
213 (Ok(stdout), Ok(stderr), Ok(exit_code), Err(_)) => {
214 tracing::warn!(
216 task = %task_name_inner,
217 "Failed to get container ID for caching"
218 );
219 Ok((exit_code as i32, stdout, stderr, None))
220 }
221 (Err(e), _, _, _) => Err(e.into()),
222 (_, Err(e), _, _) => Err(e.into()),
223 (_, _, Err(e), _) => Err(e.into()),
224 };
225
226 if let Ok(mut guard) = result_store.lock() {
227 *guard = Some(res);
228 }
229 Ok(())
230 }
231 })
232 .await
233 .map_err(|err| Error::execution(format!("Dagger backend failed: {err}")))?;
234
235 let mut guard = result_store
237 .lock()
238 .map_err(|_| Error::execution("Failed to acquire lock on task result".to_string()))?;
239
240 let inner_result = guard
241 .take()
242 .ok_or_else(|| Error::execution("Task completed but produced no result".to_string()))?;
243
244 let (exit_code, stdout, stderr, container_id) = inner_result
245 .map_err(|e: DaggerReport| Error::execution(format!("Dagger execution failed: {e}")))?;
246
247 if let Some(cid) = container_id
249 && let Ok(mut cache) = container_cache.lock()
250 {
251 cache.insert(task_name_for_cache.clone(), cid);
252 }
253
254 if !capture_output {
256 if !stdout.is_empty() {
257 print!("{}", stdout);
258 }
259 if !stderr.is_empty() {
260 eprint!("{}", stderr);
261 }
262 }
263
264 Ok(TaskResult {
265 name: task_name_for_cache,
266 exit_code: Some(exit_code),
267 stdout: if capture_output {
268 stdout
269 } else {
270 String::new()
271 },
272 stderr: if capture_output {
273 stderr
274 } else {
275 String::new()
276 },
277 success: exit_code == 0,
278 })
279 }
280
281 fn name(&self) -> &'static str {
282 "dagger"
283 }
284}
285
286pub fn create_dagger_backend(
288 config: Option<&BackendConfig>,
289 project_root: std::path::PathBuf,
290) -> Arc<dyn TaskBackend> {
291 let image = config
292 .and_then(|c| c.options.as_ref())
293 .and_then(|o| o.image.clone());
294 Arc::new(DaggerBackend::new(image, project_root))
295}