1use async_trait::async_trait;
7use cuenv_core::config::BackendConfig;
8use cuenv_core::environment::Environment;
9use cuenv_core::tasks::{Task, TaskBackend, TaskResult};
10use cuenv_core::{Error, Result};
11use dagger_sdk::{Config, ContainerId, connect_opts};
12use std::collections::HashMap;
13use std::path::Path;
14use std::sync::{Arc, Mutex};
15
16type DaggerReport = Box<dyn std::error::Error + Send + Sync + 'static>;
17
18pub struct DaggerBackend {
20 default_image: Option<String>,
21 project_root: std::path::PathBuf,
22 container_cache: Arc<Mutex<HashMap<String, ContainerId>>>,
23}
24
25impl DaggerBackend {
26 pub fn new(default_image: Option<String>, project_root: std::path::PathBuf) -> Self {
27 Self {
28 default_image,
29 project_root,
30 container_cache: Arc::new(Mutex::new(HashMap::new())),
31 }
32 }
33
34 pub fn container_cache(&self) -> &Arc<Mutex<HashMap<String, ContainerId>>> {
36 &self.container_cache
37 }
38}
39
40#[async_trait]
41impl TaskBackend for DaggerBackend {
42 async fn execute(
43 &self,
44 name: &str,
45 task: &Task,
46 env: &Environment,
47 _project_root: &Path,
48 capture_output: bool,
49 ) -> Result<TaskResult> {
50 let dagger_config = task.dagger.as_ref();
51
52 let from_task = dagger_config.and_then(|d| d.from.clone());
54 let image = dagger_config
55 .and_then(|d| d.image.clone())
56 .or_else(|| self.default_image.clone());
57
58 if from_task.is_none() && image.is_none() {
60 return Err(Error::configuration(
61 "Dagger backend requires either 'image' or 'from' (task reference). \
62 Set tasks.<name>.dagger.image, tasks.<name>.dagger.from, or config.backend.options.image"
63 .to_string(),
64 ));
65 }
66
67 let command: Vec<String> = std::iter::once(task.command.clone())
68 .chain(task.args.clone())
69 .collect();
70
71 if command.is_empty() {
72 return Err(Error::configuration(
73 "Dagger task requires a command to execute".to_string(),
74 ));
75 }
76
77 let mut resolved_secrets: Vec<(String, Option<String>, Option<String>, String)> =
79 Vec::new();
80 if let Some(secrets) = dagger_config.and_then(|d| d.secrets.as_ref()) {
81 for secret in secrets {
82 let plaintext = secret.resolver.resolve().await?;
83 resolved_secrets.push((
84 secret.name.clone(),
85 secret.path.clone(),
86 secret.env_var.clone(),
87 plaintext,
88 ));
89 }
90 }
91
92 let cache_mounts: Vec<(String, String)> = dagger_config
94 .and_then(|d| d.cache.as_ref())
95 .map(|caches| {
96 caches
97 .iter()
98 .map(|c| (c.path.clone(), c.name.clone()))
99 .collect()
100 })
101 .unwrap_or_default();
102
103 let cached_container_id = if let Some(ref from_name) = from_task {
105 let cache = self.container_cache.lock().map_err(|_| {
106 Error::configuration("Failed to acquire container cache lock".to_string())
107 })?;
108 cache.get(from_name).cloned()
109 } else {
110 None
111 };
112
113 if let Some(ref from_name) = from_task
115 && cached_container_id.is_none()
116 {
117 return Err(Error::configuration(format!(
118 "Task '{}' references container from task '{}', but no container was found. \
119 Ensure the referenced task runs first (use dependsOn).",
120 name, from_name
121 )));
122 }
123
124 let env_map = env.vars.clone();
125 let project_root = self.project_root.clone();
126 let task_name = name.to_string();
127 let task_name_for_cache = task_name.clone();
128 let container_cache = self.container_cache.clone();
129
130 type ResultType = (i32, String, String, Option<ContainerId>);
132 let result_store: Arc<Mutex<Option<std::result::Result<ResultType, DaggerReport>>>> =
133 Arc::new(Mutex::new(None));
134 let result_store_clone = result_store.clone();
135
136 let cfg = Config::default();
137
138 connect_opts(cfg, move |client| {
139 let project_root = project_root.clone();
140 let image = image.clone();
141 let command = command.clone();
142 let env_map = env_map.clone();
143 let result_store = result_store_clone.clone();
144 let resolved_secrets = resolved_secrets.clone();
145 let cache_mounts = cache_mounts.clone();
146 let cached_container_id = cached_container_id.clone();
147 let task_name_inner = task_name.clone();
148
149 async move {
150 let host_dir = client
151 .host()
152 .directory(project_root.to_string_lossy().to_string());
153
154 let mut container = if let Some(container_id) = cached_container_id {
158 client
161 .load_container_from_id(container_id)
162 .with_workdir("/workspace")
163 } else if let Some(img) = image {
164 client
166 .container()
167 .from(img)
168 .with_mounted_directory("/workspace", host_dir)
169 .with_workdir("/workspace")
170 } else {
171 if let Ok(mut guard) = result_store.lock() {
173 *guard = Some(Err("No image or container reference provided".into()));
174 }
175 return Ok(());
176 };
177
178 for (path, cache_name) in &cache_mounts {
180 let cache_vol = client.cache_volume(cache_name);
181 container = container.with_mounted_cache(path, cache_vol);
182 }
183
184 for (secret_name, path, env_var, plaintext) in &resolved_secrets {
186 let dagger_secret = client.set_secret(secret_name, plaintext);
187
188 if let Some(file_path) = path {
189 container = container.with_mounted_secret(file_path, dagger_secret.clone());
190 }
191 if let Some(var_name) = env_var {
192 container = container.with_secret_variable(var_name, dagger_secret);
193 }
194 }
195
196 for (k, v) in env_map {
198 container = container.with_env_variable(k, v);
199 }
200
201 let exec = container.with_exec(command);
203
204 let stdout_res = exec.stdout().await;
206 let stderr_res = exec.stderr().await;
207 let exit_code_res = exec.exit_code().await;
208 let container_id_res = exec.id().await;
209
210 let res = match (stdout_res, stderr_res, exit_code_res, container_id_res) {
211 (Ok(stdout), Ok(stderr), Ok(exit_code), Ok(container_id)) => {
212 Ok((exit_code as i32, stdout, stderr, Some(container_id)))
213 }
214 (Ok(stdout), Ok(stderr), Ok(exit_code), Err(_)) => {
215 tracing::warn!(
217 task = %task_name_inner,
218 "Failed to get container ID for caching"
219 );
220 Ok((exit_code as i32, stdout, stderr, None))
221 }
222 (Err(e), _, _, _) => Err(e.into()),
223 (_, Err(e), _, _) => Err(e.into()),
224 (_, _, Err(e), _) => Err(e.into()),
225 };
226
227 if let Ok(mut guard) = result_store.lock() {
228 *guard = Some(res);
229 }
230 Ok(())
231 }
232 })
233 .await
234 .map_err(|err| Error::execution(format!("Dagger backend failed: {err}")))?;
235
236 let mut guard = result_store
238 .lock()
239 .map_err(|_| Error::execution("Failed to acquire lock on task result".to_string()))?;
240
241 let inner_result = guard
242 .take()
243 .ok_or_else(|| Error::execution("Task completed but produced no result".to_string()))?;
244
245 let (exit_code, stdout, stderr, container_id) = inner_result
246 .map_err(|e: DaggerReport| Error::execution(format!("Dagger execution failed: {e}")))?;
247
248 if let Some(cid) = container_id
250 && let Ok(mut cache) = container_cache.lock()
251 {
252 cache.insert(task_name_for_cache.clone(), cid);
253 }
254
255 if !capture_output {
257 if !stdout.is_empty() {
258 print!("{}", stdout);
259 }
260 if !stderr.is_empty() {
261 eprint!("{}", stderr);
262 }
263 }
264
265 Ok(TaskResult {
266 name: task_name_for_cache,
267 exit_code: Some(exit_code),
268 stdout: if capture_output {
269 stdout
270 } else {
271 String::new()
272 },
273 stderr: if capture_output {
274 stderr
275 } else {
276 String::new()
277 },
278 success: exit_code == 0,
279 })
280 }
281
282 fn name(&self) -> &'static str {
283 "dagger"
284 }
285}
286
287pub fn create_dagger_backend(
289 config: Option<&BackendConfig>,
290 project_root: std::path::PathBuf,
291) -> Arc<dyn TaskBackend> {
292 let image = config
293 .and_then(|c| c.options.as_ref())
294 .and_then(|o| o.image.clone());
295 Arc::new(DaggerBackend::new(image, project_root))
296}