cuenv_dagger/
lib.rs

1//! Dagger backend for cuenv task execution
2//!
3//! This crate provides the `DaggerBackend` implementation that executes tasks
4//! inside containers using the Dagger SDK.
5
6use async_trait::async_trait;
7use cuenv_core::config::BackendConfig;
8use cuenv_core::environment::Environment;
9use cuenv_core::tasks::{Task, TaskBackend, TaskResult};
10use cuenv_core::{Error, Result};
11use dagger_sdk::{Config, ContainerId, connect_opts};
12use std::collections::HashMap;
13use std::path::Path;
14use std::sync::{Arc, Mutex};
15
16type DaggerReport = Box<dyn std::error::Error + Send + Sync + 'static>;
17
18/// Dagger backend - executes tasks inside containers using Dagger
19pub struct DaggerBackend {
20    default_image: Option<String>,
21    project_root: std::path::PathBuf,
22    container_cache: Arc<Mutex<HashMap<String, ContainerId>>>,
23}
24
25impl DaggerBackend {
26    pub fn new(default_image: Option<String>, project_root: std::path::PathBuf) -> Self {
27        Self {
28            default_image,
29            project_root,
30            container_cache: Arc::new(Mutex::new(HashMap::new())),
31        }
32    }
33
34    /// Get the container cache for storing/retrieving container IDs
35    pub fn container_cache(&self) -> &Arc<Mutex<HashMap<String, ContainerId>>> {
36        &self.container_cache
37    }
38}
39
40#[async_trait]
41impl TaskBackend for DaggerBackend {
42    async fn execute(
43        &self,
44        name: &str,
45        task: &Task,
46        env: &Environment,
47        _project_root: &Path,
48        capture_output: bool,
49    ) -> Result<TaskResult> {
50        let dagger_config = task.dagger.as_ref();
51
52        // Determine if we're using container chaining (from) or a base image
53        let from_task = dagger_config.and_then(|d| d.from.clone());
54        let image = dagger_config
55            .and_then(|d| d.image.clone())
56            .or_else(|| self.default_image.clone());
57
58        // Validate: must have either 'from' or 'image'
59        if from_task.is_none() && image.is_none() {
60            return Err(Error::configuration(
61                "Dagger backend requires either 'image' or 'from' (task reference). \
62                 Set tasks.<name>.dagger.image, tasks.<name>.dagger.from, or config.backend.options.image"
63                    .to_string(),
64            ));
65        }
66
67        let command: Vec<String> = std::iter::once(task.command.clone())
68            .chain(task.args.clone())
69            .collect();
70
71        if command.is_empty() {
72            return Err(Error::configuration(
73                "Dagger task requires a command to execute".to_string(),
74            ));
75        }
76
77        // Resolve secrets before entering the Dagger closure
78        let mut resolved_secrets: Vec<(String, Option<String>, Option<String>, String)> =
79            Vec::new();
80        if let Some(secrets) = dagger_config.and_then(|d| d.secrets.as_ref()) {
81            for secret in secrets {
82                let plaintext = secret.resolver.resolve().await?;
83                resolved_secrets.push((
84                    secret.name.clone(),
85                    secret.path.clone(),
86                    secret.env_var.clone(),
87                    plaintext,
88                ));
89            }
90        }
91
92        // Get cache mounts
93        let cache_mounts: Vec<(String, String)> = dagger_config
94            .and_then(|d| d.cache.as_ref())
95            .map(|caches| {
96                caches
97                    .iter()
98                    .map(|c| (c.path.clone(), c.name.clone()))
99                    .collect()
100            })
101            .unwrap_or_default();
102
103        // Get container ID from cache if using 'from'
104        let cached_container_id = if let Some(ref from_name) = from_task {
105            let cache = self.container_cache.lock().map_err(|_| {
106                Error::configuration("Failed to acquire container cache lock".to_string())
107            })?;
108            cache.get(from_name).cloned()
109        } else {
110            None
111        };
112
113        // Validate that referenced task exists in cache when using 'from'
114        if from_task.is_some() && cached_container_id.is_none() {
115            return Err(Error::configuration(format!(
116                "Task '{}' references container from task '{}', but no container was found. \
117                 Ensure the referenced task runs first (use dependsOn).",
118                name,
119                from_task.as_ref().unwrap()
120            )));
121        }
122
123        let env_map = env.vars.clone();
124        let project_root = self.project_root.clone();
125        let task_name = name.to_string();
126        let task_name_for_cache = task_name.clone();
127        let container_cache = self.container_cache.clone();
128
129        // Result store: (exit_code, stdout, stderr, container_id)
130        type ResultType = (i32, String, String, Option<ContainerId>);
131        let result_store: Arc<Mutex<Option<std::result::Result<ResultType, DaggerReport>>>> =
132            Arc::new(Mutex::new(None));
133        let result_store_clone = result_store.clone();
134
135        let cfg = Config::default();
136
137        connect_opts(cfg, move |client| {
138            let project_root = project_root.clone();
139            let image = image.clone();
140            let command = command.clone();
141            let env_map = env_map.clone();
142            let result_store = result_store_clone.clone();
143            let resolved_secrets = resolved_secrets.clone();
144            let cache_mounts = cache_mounts.clone();
145            let cached_container_id = cached_container_id.clone();
146            let task_name_inner = task_name.clone();
147
148            async move {
149                let host_dir = client
150                    .host()
151                    .directory(project_root.to_string_lossy().to_string());
152
153                // Create base container: either from cached container or from image
154                // IMPORTANT: Only mount host directory when starting fresh (not chaining)
155                // to preserve files created in /workspace by previous tasks
156                let mut container = if let Some(container_id) = cached_container_id {
157                    // Continue from previous task's container
158                    // DO NOT re-mount /workspace - it would overwrite files from previous tasks
159                    client
160                        .load_container_from_id(container_id)
161                        .with_workdir("/workspace")
162                } else if let Some(img) = image {
163                    // Start from base image - mount host directory at /workspace
164                    client
165                        .container()
166                        .from(img)
167                        .with_mounted_directory("/workspace", host_dir)
168                        .with_workdir("/workspace")
169                } else {
170                    // This shouldn't happen due to earlier validation
171                    if let Ok(mut guard) = result_store.lock() {
172                        *guard = Some(Err("No image or container reference provided".into()));
173                    }
174                    return Ok(());
175                };
176
177                // Mount cache volumes
178                for (path, cache_name) in &cache_mounts {
179                    let cache_vol = client.cache_volume(cache_name);
180                    container = container.with_mounted_cache(path, cache_vol);
181                }
182
183                // Set up secrets
184                for (secret_name, path, env_var, plaintext) in &resolved_secrets {
185                    let dagger_secret = client.set_secret(secret_name, plaintext);
186
187                    if let Some(file_path) = path {
188                        container = container.with_mounted_secret(file_path, dagger_secret.clone());
189                    }
190                    if let Some(var_name) = env_var {
191                        container = container.with_secret_variable(var_name, dagger_secret);
192                    }
193                }
194
195                // Set environment variables
196                for (k, v) in env_map {
197                    container = container.with_env_variable(k, v);
198                }
199
200                // Execute command
201                let exec = container.with_exec(command);
202
203                // Get results
204                let stdout_res = exec.stdout().await;
205                let stderr_res = exec.stderr().await;
206                let exit_code_res = exec.exit_code().await;
207                let container_id_res = exec.id().await;
208
209                let res = match (stdout_res, stderr_res, exit_code_res, container_id_res) {
210                    (Ok(stdout), Ok(stderr), Ok(exit_code), Ok(container_id)) => {
211                        Ok((exit_code as i32, stdout, stderr, Some(container_id)))
212                    }
213                    (Ok(stdout), Ok(stderr), Ok(exit_code), Err(_)) => {
214                        // Container ID fetch failed but execution succeeded
215                        tracing::warn!(
216                            task = %task_name_inner,
217                            "Failed to get container ID for caching"
218                        );
219                        Ok((exit_code as i32, stdout, stderr, None))
220                    }
221                    (Err(e), _, _, _) => Err(e.into()),
222                    (_, Err(e), _, _) => Err(e.into()),
223                    (_, _, Err(e), _) => Err(e.into()),
224                };
225
226                if let Ok(mut guard) = result_store.lock() {
227                    *guard = Some(res);
228                }
229                Ok(())
230            }
231        })
232        .await
233        .map_err(|err| Error::execution(format!("Dagger backend failed: {err}")))?;
234
235        // Extract result
236        let mut guard = result_store
237            .lock()
238            .map_err(|_| Error::execution("Failed to acquire lock on task result".to_string()))?;
239
240        let inner_result = guard
241            .take()
242            .ok_or_else(|| Error::execution("Task completed but produced no result".to_string()))?;
243
244        let (exit_code, stdout, stderr, container_id) = inner_result
245            .map_err(|e: DaggerReport| Error::execution(format!("Dagger execution failed: {e}")))?;
246
247        // Cache the container ID for potential use by subsequent tasks
248        if let Some(cid) = container_id
249            && let Ok(mut cache) = container_cache.lock()
250        {
251            cache.insert(task_name_for_cache.clone(), cid);
252        }
253
254        // Print output if not capturing
255        if !capture_output {
256            if !stdout.is_empty() {
257                print!("{}", stdout);
258            }
259            if !stderr.is_empty() {
260                eprint!("{}", stderr);
261            }
262        }
263
264        Ok(TaskResult {
265            name: task_name_for_cache,
266            exit_code: Some(exit_code),
267            stdout: if capture_output {
268                stdout
269            } else {
270                String::new()
271            },
272            stderr: if capture_output {
273                stderr
274            } else {
275                String::new()
276            },
277            success: exit_code == 0,
278        })
279    }
280
281    fn name(&self) -> &'static str {
282        "dagger"
283    }
284}
285
286/// Create a Dagger backend from configuration
287pub fn create_dagger_backend(
288    config: Option<&BackendConfig>,
289    project_root: std::path::PathBuf,
290) -> Arc<dyn TaskBackend> {
291    let image = config
292        .and_then(|c| c.options.as_ref())
293        .and_then(|o| o.image.clone());
294    Arc::new(DaggerBackend::new(image, project_root))
295}