cuenv_dagger/
lib.rs

1//! Dagger backend for cuenv task execution
2//!
3//! This crate provides the `DaggerBackend` implementation that executes tasks
4//! inside containers using the Dagger SDK.
5
6use async_trait::async_trait;
7use cuenv_core::config::BackendConfig;
8use cuenv_core::environment::Environment;
9use cuenv_core::tasks::{Task, TaskBackend, TaskResult};
10use cuenv_core::{Error, Result};
11use dagger_sdk::{Config, ContainerId, connect_opts};
12use std::collections::HashMap;
13use std::path::Path;
14use std::sync::{Arc, Mutex};
15
16type DaggerReport = Box<dyn std::error::Error + Send + Sync + 'static>;
17
18/// Dagger backend - executes tasks inside containers using Dagger
19pub struct DaggerBackend {
20    default_image: Option<String>,
21    project_root: std::path::PathBuf,
22    container_cache: Arc<Mutex<HashMap<String, ContainerId>>>,
23}
24
25impl DaggerBackend {
26    pub fn new(default_image: Option<String>, project_root: std::path::PathBuf) -> Self {
27        Self {
28            default_image,
29            project_root,
30            container_cache: Arc::new(Mutex::new(HashMap::new())),
31        }
32    }
33
34    /// Get the container cache for storing/retrieving container IDs
35    pub fn container_cache(&self) -> &Arc<Mutex<HashMap<String, ContainerId>>> {
36        &self.container_cache
37    }
38}
39
40#[async_trait]
41impl TaskBackend for DaggerBackend {
42    async fn execute(
43        &self,
44        name: &str,
45        task: &Task,
46        env: &Environment,
47        _project_root: &Path,
48        capture_output: bool,
49    ) -> Result<TaskResult> {
50        let dagger_config = task.dagger.as_ref();
51
52        // Determine if we're using container chaining (from) or a base image
53        let from_task = dagger_config.and_then(|d| d.from.clone());
54        let image = dagger_config
55            .and_then(|d| d.image.clone())
56            .or_else(|| self.default_image.clone());
57
58        // Validate: must have either 'from' or 'image'
59        if from_task.is_none() && image.is_none() {
60            return Err(Error::configuration(
61                "Dagger backend requires either 'image' or 'from' (task reference). \
62                 Set tasks.<name>.dagger.image, tasks.<name>.dagger.from, or config.backend.options.image"
63                    .to_string(),
64            ));
65        }
66
67        let command: Vec<String> = std::iter::once(task.command.clone())
68            .chain(task.args.clone())
69            .collect();
70
71        if command.is_empty() {
72            return Err(Error::configuration(
73                "Dagger task requires a command to execute".to_string(),
74            ));
75        }
76
77        // Resolve secrets before entering the Dagger closure
78        let mut resolved_secrets: Vec<(String, Option<String>, Option<String>, String)> =
79            Vec::new();
80        if let Some(secrets) = dagger_config.and_then(|d| d.secrets.as_ref()) {
81            for secret in secrets {
82                let plaintext = secret.resolver.resolve().await?;
83                resolved_secrets.push((
84                    secret.name.clone(),
85                    secret.path.clone(),
86                    secret.env_var.clone(),
87                    plaintext,
88                ));
89            }
90        }
91
92        // Get cache mounts
93        let cache_mounts: Vec<(String, String)> = dagger_config
94            .and_then(|d| d.cache.as_ref())
95            .map(|caches| {
96                caches
97                    .iter()
98                    .map(|c| (c.path.clone(), c.name.clone()))
99                    .collect()
100            })
101            .unwrap_or_default();
102
103        // Get container ID from cache if using 'from'
104        let cached_container_id = if let Some(ref from_name) = from_task {
105            let cache = self.container_cache.lock().map_err(|_| {
106                Error::configuration("Failed to acquire container cache lock".to_string())
107            })?;
108            cache.get(from_name).cloned()
109        } else {
110            None
111        };
112
113        // Validate that referenced task exists in cache when using 'from'
114        if let Some(ref from_name) = from_task
115            && cached_container_id.is_none()
116        {
117            return Err(Error::configuration(format!(
118                "Task '{}' references container from task '{}', but no container was found. \
119                 Ensure the referenced task runs first (use dependsOn).",
120                name, from_name
121            )));
122        }
123
124        let env_map = env.vars.clone();
125        let project_root = self.project_root.clone();
126        let task_name = name.to_string();
127        let task_name_for_cache = task_name.clone();
128        let container_cache = self.container_cache.clone();
129
130        // Result store: (exit_code, stdout, stderr, container_id)
131        type ResultType = (i32, String, String, Option<ContainerId>);
132        let result_store: Arc<Mutex<Option<std::result::Result<ResultType, DaggerReport>>>> =
133            Arc::new(Mutex::new(None));
134        let result_store_clone = result_store.clone();
135
136        let cfg = Config::default();
137
138        connect_opts(cfg, move |client| {
139            let project_root = project_root.clone();
140            let image = image.clone();
141            let command = command.clone();
142            let env_map = env_map.clone();
143            let result_store = result_store_clone.clone();
144            let resolved_secrets = resolved_secrets.clone();
145            let cache_mounts = cache_mounts.clone();
146            let cached_container_id = cached_container_id.clone();
147            let task_name_inner = task_name.clone();
148
149            async move {
150                let host_dir = client
151                    .host()
152                    .directory(project_root.to_string_lossy().to_string());
153
154                // Create base container: either from cached container or from image
155                // IMPORTANT: Only mount host directory when starting fresh (not chaining)
156                // to preserve files created in /workspace by previous tasks
157                let mut container = if let Some(container_id) = cached_container_id {
158                    // Continue from previous task's container
159                    // DO NOT re-mount /workspace - it would overwrite files from previous tasks
160                    client
161                        .load_container_from_id(container_id)
162                        .with_workdir("/workspace")
163                } else if let Some(img) = image {
164                    // Start from base image - mount host directory at /workspace
165                    client
166                        .container()
167                        .from(img)
168                        .with_mounted_directory("/workspace", host_dir)
169                        .with_workdir("/workspace")
170                } else {
171                    // This shouldn't happen due to earlier validation
172                    if let Ok(mut guard) = result_store.lock() {
173                        *guard = Some(Err("No image or container reference provided".into()));
174                    }
175                    return Ok(());
176                };
177
178                // Mount cache volumes
179                for (path, cache_name) in &cache_mounts {
180                    let cache_vol = client.cache_volume(cache_name);
181                    container = container.with_mounted_cache(path, cache_vol);
182                }
183
184                // Set up secrets
185                for (secret_name, path, env_var, plaintext) in &resolved_secrets {
186                    let dagger_secret = client.set_secret(secret_name, plaintext);
187
188                    if let Some(file_path) = path {
189                        container = container.with_mounted_secret(file_path, dagger_secret.clone());
190                    }
191                    if let Some(var_name) = env_var {
192                        container = container.with_secret_variable(var_name, dagger_secret);
193                    }
194                }
195
196                // Set environment variables
197                for (k, v) in env_map {
198                    container = container.with_env_variable(k, v);
199                }
200
201                // Execute command
202                let exec = container.with_exec(command);
203
204                // Get results
205                let stdout_res = exec.stdout().await;
206                let stderr_res = exec.stderr().await;
207                let exit_code_res = exec.exit_code().await;
208                let container_id_res = exec.id().await;
209
210                let res = match (stdout_res, stderr_res, exit_code_res, container_id_res) {
211                    (Ok(stdout), Ok(stderr), Ok(exit_code), Ok(container_id)) => {
212                        Ok((exit_code as i32, stdout, stderr, Some(container_id)))
213                    }
214                    (Ok(stdout), Ok(stderr), Ok(exit_code), Err(_)) => {
215                        // Container ID fetch failed but execution succeeded
216                        tracing::warn!(
217                            task = %task_name_inner,
218                            "Failed to get container ID for caching"
219                        );
220                        Ok((exit_code as i32, stdout, stderr, None))
221                    }
222                    (Err(e), _, _, _) => Err(e.into()),
223                    (_, Err(e), _, _) => Err(e.into()),
224                    (_, _, Err(e), _) => Err(e.into()),
225                };
226
227                if let Ok(mut guard) = result_store.lock() {
228                    *guard = Some(res);
229                }
230                Ok(())
231            }
232        })
233        .await
234        .map_err(|err| Error::execution(format!("Dagger backend failed: {err}")))?;
235
236        // Extract result
237        let mut guard = result_store
238            .lock()
239            .map_err(|_| Error::execution("Failed to acquire lock on task result".to_string()))?;
240
241        let inner_result = guard
242            .take()
243            .ok_or_else(|| Error::execution("Task completed but produced no result".to_string()))?;
244
245        let (exit_code, stdout, stderr, container_id) = inner_result
246            .map_err(|e: DaggerReport| Error::execution(format!("Dagger execution failed: {e}")))?;
247
248        // Cache the container ID for potential use by subsequent tasks
249        if let Some(cid) = container_id
250            && let Ok(mut cache) = container_cache.lock()
251        {
252            cache.insert(task_name_for_cache.clone(), cid);
253        }
254
255        // Print output if not capturing
256        if !capture_output {
257            if !stdout.is_empty() {
258                print!("{}", stdout);
259            }
260            if !stderr.is_empty() {
261                eprint!("{}", stderr);
262            }
263        }
264
265        Ok(TaskResult {
266            name: task_name_for_cache,
267            exit_code: Some(exit_code),
268            stdout: if capture_output {
269                stdout
270            } else {
271                String::new()
272            },
273            stderr: if capture_output {
274                stderr
275            } else {
276                String::new()
277            },
278            success: exit_code == 0,
279        })
280    }
281
282    fn name(&self) -> &'static str {
283        "dagger"
284    }
285}
286
287/// Create a Dagger backend from configuration
288pub fn create_dagger_backend(
289    config: Option<&BackendConfig>,
290    project_root: std::path::PathBuf,
291) -> Arc<dyn TaskBackend> {
292    let image = config
293        .and_then(|c| c.options.as_ref())
294        .and_then(|o| o.image.clone());
295    Arc::new(DaggerBackend::new(image, project_root))
296}