capsula_server/
lib.rs

1use askama::Template;
2use askama_web::WebTemplate;
3
4#[expect(
5    clippy::unnecessary_wraps,
6    reason = "Askama filter_fn macro generates code that triggers this lint, but Result return type is required by the Askama filter API"
7)]
8mod filters {
9    use askama::filter_fn;
10
11    #[filter_fn]
12    pub fn format_command(s: &str, _env: &dyn askama::Values) -> ::askama::Result<String> {
13        // Try to parse as JSON array
14        Ok(serde_json::from_str::<Vec<String>>(s).map_or_else(
15            |_| s.to_string(),
16            |cmd_array| {
17                shlex::try_join(cmd_array.iter().map(String::as_str))
18                    .unwrap_or_else(|_| cmd_array.join(" "))
19            },
20        ))
21    }
22
23    #[filter_fn]
24    pub fn pretty_json<T: std::fmt::Display>(
25        s: T,
26        _env: &dyn askama::Values,
27    ) -> ::askama::Result<String> {
28        let s_str = s.to_string();
29        // Try to parse as JSON and pretty-print with 2-space indentation
30        Ok(serde_json::from_str::<serde_json::Value>(&s_str)
31            .ok()
32            .and_then(|value| serde_json::to_string_pretty(&value).ok())
33            .unwrap_or(s_str))
34    }
35}
36
37use axum::{
38    Router,
39    body::Body,
40    extract::{DefaultBodyLimit, Multipart, Path, Query, State},
41    http::{StatusCode, header},
42    response::{IntoResponse, Json, Response},
43    routing::{get, post},
44};
45use capsula_api_types::VaultInfo;
46use serde_json::json;
47use sha2::{Digest, Sha256};
48use sqlx::PgPool;
49use std::collections::VecDeque;
50use std::path::PathBuf;
51use tower_http::services::ServeDir;
52use tracing::{error, info, warn};
53
54#[derive(Clone)]
55pub struct AppState {
56    pub pool: PgPool,
57    pub storage_path: PathBuf,
58}
59
60mod models;
61
62#[derive(Template, WebTemplate)]
63#[template(path = "index.html")]
64struct IndexTemplate;
65
66#[derive(Template, WebTemplate)]
67#[template(path = "vaults.html")]
68struct VaultsTemplate {
69    vaults: Vec<VaultInfo>,
70}
71
72#[derive(Template, WebTemplate)]
73#[template(path = "runs.html")]
74struct RunsTemplate {
75    runs: Vec<models::Run>,
76    vault: Option<String>,
77    page: i64,
78    total_pages: i64,
79}
80
81#[derive(Template, WebTemplate)]
82#[template(path = "run_detail.html")]
83struct RunDetailTemplate {
84    run: models::Run,
85    pre_run_hooks: Vec<models::HookOutput>,
86    post_run_hooks: Vec<models::HookOutput>,
87    files: Vec<models::CapturedFile>,
88}
89
90#[derive(Template, WebTemplate)]
91#[template(path = "error.html")]
92struct ErrorTemplate {
93    status_code: u16,
94    title: String,
95    message: String,
96}
97
98pub async fn create_pool(database_url: &str, max_connections: u32) -> Result<PgPool, sqlx::Error> {
99    sqlx::postgres::PgPoolOptions::new()
100        .max_connections(max_connections)
101        .acquire_timeout(std::time::Duration::from_secs(3))
102        .connect(database_url)
103        .await
104}
105
106pub fn build_app(pool: PgPool, storage_path: PathBuf, max_body_size: usize) -> Router {
107    let static_dir: PathBuf = std::env::var("CAPSULA_STATIC_DIR")
108        .expect("CAPSULA_STATIC_DIR environment variable must be set")
109        .into();
110
111    let state = AppState { pool, storage_path };
112
113    Router::new()
114        .route("/", get(index))
115        .route("/vaults", get(vaults_page))
116        .route("/runs", get(runs_page))
117        .route("/runs/{id}", get(run_detail_page))
118        .route("/health", get(health_check))
119        .route("/api/v1/vaults", get(list_vaults))
120        .route("/api/v1/vaults/{name}", get(get_vault_info))
121        .route("/api/v1/runs", post(create_run).get(list_runs))
122        .route("/api/v1/runs/{id}", get(get_run))
123        .route("/api/v1/runs/{id}/files/{*path}", get(download_file))
124        .route(
125            "/api/v1/upload",
126            post(upload_files).layer(DefaultBodyLimit::max(max_body_size)),
127        )
128        .nest_service("/static", ServeDir::new(static_dir))
129        .fallback(not_found)
130        .with_state(state)
131}
132
133async fn index() -> impl IntoResponse {
134    IndexTemplate
135}
136
137async fn not_found() -> impl IntoResponse {
138    (
139        StatusCode::NOT_FOUND,
140        ErrorTemplate {
141            status_code: 404,
142            title: "Page Not Found".to_string(),
143            message: "The page you are looking for does not exist.".to_string(),
144        },
145    )
146}
147
148async fn vaults_page(State(state): State<AppState>) -> impl IntoResponse {
149    info!("Rendering vaults page");
150
151    let result = sqlx::query!(
152        r#"
153        SELECT vault as name, COUNT(*) as "run_count!"
154        FROM runs
155        GROUP BY vault
156        ORDER BY vault
157        "#
158    )
159    .fetch_all(&state.pool)
160    .await;
161
162    match result {
163        Ok(rows) => {
164            let vaults: Vec<VaultInfo> = rows
165                .into_iter()
166                .map(|row| VaultInfo {
167                    name: row.name,
168                    run_count: row.run_count,
169                })
170                .collect();
171            VaultsTemplate { vaults }
172        }
173        Err(e) => {
174            error!("Failed to fetch vaults: {}", e);
175            // Return empty vaults on error
176            VaultsTemplate { vaults: Vec::new() }
177        }
178    }
179}
180
181async fn runs_page(
182    State(state): State<AppState>,
183    Query(params): Query<models::ListRunsQuery>,
184) -> impl IntoResponse {
185    let page = params.offset.unwrap_or(0) / params.limit.unwrap_or(50) + 1;
186    let limit = params.limit.unwrap_or(50);
187    let offset = (page - 1) * limit;
188
189    info!(
190        "Rendering runs page: page={}, vault={:?}",
191        page, params.vault
192    );
193
194    // Get total count for pagination
195    let total_count = if let Some(ref vault) = params.vault {
196        sqlx::query_scalar!(
197            r#"
198            SELECT COUNT(*)::bigint
199            FROM runs
200            WHERE vault = $1
201            "#,
202            vault
203        )
204        .fetch_one(&state.pool)
205        .await
206        .unwrap_or(Some(0))
207        .unwrap_or(0)
208    } else {
209        sqlx::query_scalar!(
210            r#"
211            SELECT COUNT(*)::bigint
212            FROM runs
213            "#
214        )
215        .fetch_one(&state.pool)
216        .await
217        .unwrap_or(Some(0))
218        .unwrap_or(0)
219    };
220
221    let total_pages = (total_count + limit - 1) / limit;
222
223    // Fetch runs
224    let runs_result = if let Some(ref vault) = params.vault {
225        sqlx::query_as!(
226            models::Run,
227            r#"
228            SELECT id, name, timestamp, command, vault, project_root,
229                   exit_code, duration_ms, stdout, stderr,
230                   created_at, updated_at
231            FROM runs
232            WHERE vault = $1
233            ORDER BY timestamp DESC
234            LIMIT $2 OFFSET $3
235            "#,
236            vault,
237            limit,
238            offset
239        )
240        .fetch_all(&state.pool)
241        .await
242    } else {
243        sqlx::query_as!(
244            models::Run,
245            r#"
246            SELECT id, name, timestamp, command, vault, project_root,
247                   exit_code, duration_ms, stdout, stderr,
248                   created_at, updated_at
249            FROM runs
250            ORDER BY timestamp DESC
251            LIMIT $1 OFFSET $2
252            "#,
253            limit,
254            offset
255        )
256        .fetch_all(&state.pool)
257        .await
258    };
259
260    match runs_result {
261        Ok(runs) => RunsTemplate {
262            runs,
263            vault: params.vault,
264            page,
265            total_pages,
266        },
267        Err(e) => {
268            error!("Failed to fetch runs: {}", e);
269            RunsTemplate {
270                runs: Vec::new(),
271                vault: params.vault,
272                page: 1,
273                total_pages: 1,
274            }
275        }
276    }
277}
278
279async fn run_detail_page(
280    State(state): State<AppState>,
281    Path(id): Path<String>,
282) -> Result<RunDetailTemplate, StatusCode> {
283    info!("Rendering run detail page for: {}", id);
284
285    // Fetch run metadata
286    let run = sqlx::query_as!(
287        models::Run,
288        r#"
289        SELECT id, name, timestamp, command, vault, project_root,
290               exit_code, duration_ms, stdout, stderr,
291               created_at, updated_at
292        FROM runs
293        WHERE id = $1
294        "#,
295        id
296    )
297    .fetch_optional(&state.pool)
298    .await
299    .map_err(|e| {
300        error!("Database error while fetching run: {}", e);
301        StatusCode::INTERNAL_SERVER_ERROR
302    })?
303    .ok_or_else(|| {
304        info!("Run not found: {}", id);
305        StatusCode::NOT_FOUND
306    })?;
307
308    // Fetch hook outputs
309    let hook_outputs_result = sqlx::query_as!(
310        models::RunOutputRow,
311        r#"
312        SELECT phase, hook_id, config, output, success, error
313        FROM run_outputs
314        WHERE run_id = $1
315        ORDER BY id
316        "#,
317        id
318    )
319    .fetch_all(&state.pool)
320    .await;
321
322    let (pre_run_hooks, post_run_hooks) = match hook_outputs_result {
323        Ok(rows) => {
324            let mut pre_hooks = Vec::new();
325            let mut post_hooks = Vec::new();
326
327            for row in rows {
328                let hook_output = models::HookOutput {
329                    meta: models::HookMeta {
330                        id: row.hook_id,
331                        config: row.config,
332                        success: row.success,
333                        error: row.error,
334                    },
335                    output: row.output,
336                };
337
338                if row.phase == "pre" {
339                    pre_hooks.push(hook_output);
340                } else if row.phase == "post" {
341                    post_hooks.push(hook_output);
342                } else {
343                    warn!("Unknown hook phase: {}", row.phase);
344                }
345            }
346
347            (pre_hooks, post_hooks)
348        }
349        Err(e) => {
350            error!("Failed to fetch hook outputs: {}", e);
351            (Vec::new(), Vec::new())
352        }
353    };
354
355    // Fetch captured files
356    let files_result = sqlx::query_as!(
357        models::CapturedFile,
358        r#"
359        SELECT path, size, hash, storage_path, content_type
360        FROM captured_files
361        WHERE run_id = $1
362        ORDER BY path
363        "#,
364        id
365    )
366    .fetch_all(&state.pool)
367    .await;
368
369    let files = match files_result {
370        Ok(files) => files,
371        Err(e) => {
372            error!("Failed to fetch captured files: {}", e);
373            Vec::new()
374        }
375    };
376
377    Ok(RunDetailTemplate {
378        run,
379        pre_run_hooks,
380        post_run_hooks,
381        files,
382    })
383}
384
385async fn health_check(State(state): State<AppState>) -> impl IntoResponse {
386    match sqlx::query("SELECT 1").fetch_one(&state.pool).await {
387        Ok(_) => Json(json!({
388            "status": "ok",
389            "database": "connected"
390        })),
391        Err(e) => Json(json!({
392            "status": "error",
393            "database": "disconnected",
394            "error": e.to_string()
395        })),
396    }
397}
398
399async fn list_vaults(State(state): State<AppState>) -> impl IntoResponse {
400    info!("Listing all vaults");
401
402    let result = sqlx::query!(
403        r#"
404        SELECT vault as name, COUNT(*) as "run_count!"
405        FROM runs
406        GROUP BY vault
407        ORDER BY vault
408        "#
409    )
410    .fetch_all(&state.pool)
411    .await;
412
413    match result {
414        Ok(rows) => {
415            let vaults: Vec<VaultInfo> = rows
416                .into_iter()
417                .map(|row| VaultInfo {
418                    name: row.name,
419                    run_count: row.run_count,
420                })
421                .collect();
422            info!("Found {} vaults", vaults.len());
423            let response = capsula_api_types::VaultsResponse {
424                status: "ok".to_string(),
425                vaults,
426            };
427            Json(serde_json::to_value(response).expect("Failed to serialize VaultsResponse"))
428        }
429        Err(e) => {
430            error!("Failed to list vaults: {}", e);
431            let response = capsula_api_types::ErrorResponse {
432                status: "error".to_string(),
433                error: e.to_string(),
434            };
435            Json(serde_json::to_value(response).expect("Failed to serialize ErrorResponse"))
436        }
437    }
438}
439
440async fn get_vault_info(
441    State(state): State<AppState>,
442    Path(name): Path<String>,
443) -> impl IntoResponse {
444    info!("Getting vault info: {}", name);
445
446    let result = sqlx::query!(
447        r#"
448        SELECT vault as name, COUNT(*) as "run_count!"
449        FROM runs
450        WHERE vault = $1
451        GROUP BY vault
452        "#,
453        name
454    )
455    .fetch_optional(&state.pool)
456    .await;
457
458    match result {
459        Ok(Some(row)) => {
460            let vault = VaultInfo {
461                name: row.name,
462                run_count: row.run_count,
463            };
464            info!("Found vault: {} with {} runs", vault.name, vault.run_count);
465            let response = capsula_api_types::VaultExistsResponse {
466                status: "ok".to_string(),
467                exists: true,
468                vault: Some(vault),
469            };
470            Json(serde_json::to_value(response).expect("Failed to serialize VaultExistsResponse"))
471        }
472        Ok(None) => {
473            info!("Vault not found: {}", name);
474            let response = capsula_api_types::VaultExistsResponse {
475                status: "ok".to_string(),
476                exists: false,
477                vault: None,
478            };
479            Json(serde_json::to_value(response).expect("Failed to serialize VaultExistsResponse"))
480        }
481        Err(e) => {
482            error!("Failed to get vault info: {}", e);
483            let response = capsula_api_types::ErrorResponse {
484                status: "error".to_string(),
485                error: e.to_string(),
486            };
487            Json(serde_json::to_value(response).expect("Failed to serialize ErrorResponse"))
488        }
489    }
490}
491
492async fn list_runs(
493    State(state): State<AppState>,
494    Query(params): Query<models::ListRunsQuery>,
495) -> impl IntoResponse {
496    let limit = params.limit.unwrap_or(100);
497    let offset = params.offset.unwrap_or(0);
498
499    if let Some(ref vault) = params.vault {
500        info!(
501            "Listing runs for vault: {} (limit={}, offset={})",
502            vault, limit, offset
503        );
504    } else {
505        info!("Listing all runs (limit={}, offset={})", limit, offset);
506    }
507
508    let result = if let Some(vault) = params.vault {
509        sqlx::query_as!(
510            models::Run,
511            r#"
512            SELECT id, name, timestamp, command, vault, project_root,
513                   exit_code, duration_ms, stdout, stderr,
514                   created_at, updated_at
515            FROM runs
516            WHERE vault = $1
517            ORDER BY timestamp DESC
518            LIMIT $2 OFFSET $3
519            "#,
520            vault,
521            limit,
522            offset
523        )
524        .fetch_all(&state.pool)
525        .await
526    } else {
527        sqlx::query_as!(
528            models::Run,
529            r#"
530            SELECT id, name, timestamp, command, vault, project_root,
531                   exit_code, duration_ms, stdout, stderr,
532                   created_at, updated_at
533            FROM runs
534            ORDER BY timestamp DESC
535            LIMIT $1 OFFSET $2
536            "#,
537            limit,
538            offset
539        )
540        .fetch_all(&state.pool)
541        .await
542    };
543
544    match result {
545        Ok(runs) => {
546            info!("Found {} runs", runs.len());
547            Json(json!({
548                "status": "ok",
549                "runs": runs,
550                "limit": limit,
551                "offset": offset
552            }))
553        }
554        Err(e) => {
555            error!("Failed to list runs: {}", e);
556            Json(json!({
557                "status": "error",
558                "error": e.to_string()
559            }))
560        }
561    }
562}
563
564async fn create_run(
565    State(state): State<AppState>,
566    Json(request): Json<models::CreateRunRequest>,
567) -> impl IntoResponse {
568    info!("Creating run: {}", request.id);
569
570    let result = sqlx::query!(
571        r#"
572        INSERT INTO runs (
573            id, name, timestamp, command, vault, project_root,
574            exit_code, duration_ms, stdout, stderr
575        ) VALUES (
576            $1, $2, $3, $4, $5, $6,
577            $7, $8, $9, $10
578        )
579        "#,
580        request.id,
581        request.name,
582        request.timestamp,
583        request.command,
584        request.vault,
585        request.project_root,
586        request.exit_code,
587        request.duration_ms,
588        request.stdout,
589        request.stderr
590    )
591    .execute(&state.pool)
592    .await;
593
594    match result {
595        Ok(_) => {
596            info!("Run created successfully");
597            (
598                StatusCode::CREATED,
599                Json(json!({
600                    "status": "created",
601                    "run": request
602                })),
603            )
604        }
605        Err(e) => {
606            error!("Failed to insert run: {}", e);
607            let status = if e.to_string().contains("duplicate key") {
608                StatusCode::CONFLICT
609            } else {
610                StatusCode::INTERNAL_SERVER_ERROR
611            };
612            (
613                status,
614                Json(json!({
615                    "status": "error",
616                    "error": e.to_string()
617                })),
618            )
619        }
620    }
621}
622
623async fn get_run(State(state): State<AppState>, Path(id): Path<String>) -> impl IntoResponse {
624    info!("Getting run: {}", id);
625
626    let result = sqlx::query_as!(
627        models::Run,
628        r#"
629        SELECT id, name, timestamp, command, vault, project_root,
630               exit_code, duration_ms, stdout, stderr,
631               created_at, updated_at
632        FROM runs
633        WHERE id = $1
634        "#,
635        id
636    )
637    .fetch_optional(&state.pool)
638    .await;
639
640    match result {
641        Ok(Some(run)) => {
642            info!("Found run: {}", run.id);
643
644            // Fetch hook outputs
645            let hook_outputs_result = sqlx::query_as!(
646                models::RunOutputRow,
647                r#"
648                SELECT phase, hook_id, config, output, success, error
649                FROM run_outputs
650                WHERE run_id = $1
651                ORDER BY id
652                "#,
653                id
654            )
655            .fetch_all(&state.pool)
656            .await;
657
658            let (pre_run_hooks, post_run_hooks) = match hook_outputs_result {
659                Ok(rows) => {
660                    let mut pre_hooks = Vec::new();
661                    let mut post_hooks = Vec::new();
662
663                    for row in rows {
664                        let hook_output = models::HookOutput {
665                            meta: models::HookMeta {
666                                id: row.hook_id,
667                                config: row.config,
668                                success: row.success,
669                                error: row.error,
670                            },
671                            output: row.output,
672                        };
673
674                        if row.phase == "pre" {
675                            pre_hooks.push(hook_output);
676                        } else if row.phase == "post" {
677                            post_hooks.push(hook_output);
678                        } else {
679                            warn!("Unknown hook phase: {}", row.phase);
680                        }
681                    }
682
683                    info!(
684                        "Found {} pre-run hooks and {} post-run hooks",
685                        pre_hooks.len(),
686                        post_hooks.len()
687                    );
688                    (pre_hooks, post_hooks)
689                }
690                Err(e) => {
691                    error!("Failed to fetch hook outputs: {}", e);
692                    (Vec::new(), Vec::new())
693                }
694            };
695
696            Json(json!({
697                "status": "ok",
698                "run": run,
699                "pre_run_hooks": pre_run_hooks,
700                "post_run_hooks": post_run_hooks
701            }))
702        }
703        Ok(None) => {
704            info!("Run not found: {}", id);
705            Json(json!({
706                "status": "not_found",
707                "error": format!("Run with id {} not found", id)
708            }))
709        }
710        Err(e) => {
711            error!("Failed to retrieve run: {}", e);
712            Json(json!({
713                "status": "error",
714                "error": e.to_string()
715            }))
716        }
717    }
718}
719
720#[expect(clippy::too_many_lines, reason = "TODO: Refactor later")]
721#[expect(
722    clippy::else_if_without_else,
723    reason = "There is `continue` or `return` in each branch, so `else` is redundant"
724)]
725async fn upload_files(
726    State(state): State<AppState>,
727    mut multipart: Multipart,
728) -> impl IntoResponse {
729    let storage_path = &state.storage_path;
730    info!("Received file upload request");
731
732    if let Err(e) = tokio::fs::create_dir_all(&storage_path).await {
733        error!(
734            "Failed to create storage directory at {}: {}",
735            storage_path.display(),
736            e
737        );
738        return Json(json!({
739            "status": "error",
740            "error": format!("Failed to create storage directory: {}", e)
741        }));
742    }
743
744    let mut files_processed = 0;
745    let mut total_bytes = 0u64;
746    let mut run_id: Option<String> = None;
747    let mut pending_paths: VecDeque<String> = VecDeque::new();
748    let mut pre_run_hooks: Option<Vec<models::HookOutput>> = None;
749    let mut post_run_hooks: Option<Vec<models::HookOutput>> = None;
750
751    while let Ok(Some(field)) = multipart.next_field().await {
752        let field_name = field.name().unwrap_or("unknown").to_string();
753
754        if field_name == "run_id" {
755            match field.text().await {
756                Ok(text) => {
757                    run_id = Some(text);
758                    continue;
759                }
760                Err(e) => {
761                    error!("Failed to read run_id field: {}", e);
762                    return Json(json!({
763                        "status": "error",
764                        "error": format!("Failed to read run_id: {}", e)
765                    }));
766                }
767            }
768        } else if field_name == "path" {
769            match field.text().await {
770                Ok(text) => {
771                    pending_paths.push_back(text);
772                    continue;
773                }
774                Err(e) => {
775                    error!("Failed to read path field: {}", e);
776                    return Json(json!({
777                        "status": "error",
778                        "error": format!("Failed to read path: {}", e)
779                    }));
780                }
781            }
782        } else if field_name == "pre_run" {
783            match field.text().await {
784                Ok(text) => match serde_json::from_str::<Vec<models::HookOutput>>(&text) {
785                    Ok(hooks) => {
786                        info!("Parsed {} pre-run hooks", hooks.len());
787                        pre_run_hooks = Some(hooks);
788                        continue;
789                    }
790                    Err(e) => {
791                        error!("Failed to parse pre_run JSON: {}", e);
792                        return Json(json!({
793                            "status": "error",
794                            "error": format!("Failed to parse pre_run JSON: {}", e)
795                        }));
796                    }
797                },
798                Err(e) => {
799                    error!("Failed to read pre_run field: {}", e);
800                    return Json(json!({
801                        "status": "error",
802                        "error": format!("Failed to read pre_run: {}", e)
803                    }));
804                }
805            }
806        } else if field_name == "post_run" {
807            match field.text().await {
808                Ok(text) => match serde_json::from_str::<Vec<models::HookOutput>>(&text) {
809                    Ok(hooks) => {
810                        info!("Parsed {} post-run hooks", hooks.len());
811                        post_run_hooks = Some(hooks);
812                        continue;
813                    }
814                    Err(e) => {
815                        error!("Failed to parse post_run JSON: {}", e);
816                        return Json(json!({
817                            "status": "error",
818                            "error": format!("Failed to parse post_run JSON: {}", e)
819                        }));
820                    }
821                },
822                Err(e) => {
823                    error!("Failed to read post_run field: {}", e);
824                    return Json(json!({
825                        "status": "error",
826                        "error": format!("Failed to read post_run: {}", e)
827                    }));
828                }
829            }
830        }
831
832        let file_name = field.file_name().unwrap_or("unknown").to_string();
833        let content_type = field
834            .content_type()
835            .unwrap_or("application/octet-stream")
836            .to_string();
837
838        info!(
839            "Processing file: field_name={}, file_name={}, content_type={}",
840            field_name, file_name, content_type
841        );
842
843        match field.bytes().await {
844            Ok(data) => {
845                let size = data.len();
846                total_bytes += size as u64;
847                let Ok(size_i64) = i64::try_from(size) else {
848                    error!("File too large to store size in database: {} bytes", size);
849                    return Json(json!({
850                        "status": "error",
851                        "error": "File too large to store size in database"
852                    }));
853                };
854
855                let mut hasher = Sha256::new();
856                hasher.update(&data);
857                let hash = format!("{:x}", hasher.finalize());
858
859                info!("File hash: {}, size: {} bytes", hash, size);
860
861                let hash_dir = &hash[0..2];
862                let file_storage_dir = storage_path.join(hash_dir);
863                if let Err(e) = tokio::fs::create_dir_all(&file_storage_dir).await {
864                    error!(
865                        "Failed to create hash directory at {}: {}",
866                        file_storage_dir.display(),
867                        e
868                    );
869                    return Json(json!({
870                        "status": "error",
871                        "error": format!("Failed to create storage directory: {}", e)
872                    }));
873                }
874
875                let file_storage_path = file_storage_dir.join(&hash);
876                let storage_path_str = file_storage_path.to_string_lossy().to_string();
877
878                if file_storage_path.exists() {
879                    info!("File already exists (deduplicated): {}", storage_path_str);
880                } else {
881                    if let Err(e) = tokio::fs::write(&file_storage_path, &data).await {
882                        error!("Failed to write file: {}", e);
883                        return Json(json!({
884                            "status": "error",
885                            "error": format!("Failed to write file: {}", e)
886                        }));
887                    }
888                    info!("Saved new file: {}", storage_path_str);
889                }
890
891                if let Some(ref rid) = run_id {
892                    let relative_path = pending_paths
893                        .pop_front()
894                        .or_else(|| {
895                            if file_name == "unknown" {
896                                None
897                            } else {
898                                Some(file_name.clone())
899                            }
900                        })
901                        .unwrap_or_else(|| {
902                            if field_name != "unknown" && field_name != "file" {
903                                field_name.clone()
904                            } else {
905                                format!("file-{}", files_processed + 1)
906                            }
907                        });
908
909                    let result = sqlx::query!(
910                        r#"
911                        INSERT INTO captured_files (run_id, path, size, hash, storage_path, content_type)
912                        VALUES ($1, $2, $3, $4, $5, $6)
913                        ON CONFLICT (run_id, path) DO UPDATE
914                        SET size = EXCLUDED.size,
915                            hash = EXCLUDED.hash,
916                            storage_path = EXCLUDED.storage_path,
917                            content_type = EXCLUDED.content_type
918                        "#,
919                        rid,
920                        relative_path,
921                        size_i64,
922                        hash,
923                        storage_path_str,
924                        content_type
925                    )
926                    .execute(&state.pool)
927                    .await;
928
929                    match result {
930                        Ok(_) => {
931                            info!("Stored file metadata in database");
932                        }
933                        Err(e) => {
934                            error!("Failed to store file metadata: {}", e);
935                            return Json(json!({
936                                "status": "error",
937                                "error": format!("Failed to store file metadata: {}", e)
938                            }));
939                        }
940                    }
941                }
942
943                files_processed += 1;
944                info!("Successfully processed file: {} bytes", size);
945            }
946            Err(e) => {
947                error!("Failed to read file field: {}", e);
948                return Json(json!({
949                    "status": "error",
950                    "error": format!("Failed to read file: {}", e)
951                }));
952            }
953        }
954    }
955
956    info!(
957        "Upload complete: {} files, {} bytes total",
958        files_processed, total_bytes
959    );
960
961    // Store hook outputs if provided
962    let mut pre_run_count = 0;
963    let mut post_run_count = 0;
964
965    if let Some(ref rid) = run_id {
966        if let Some(hooks) = pre_run_hooks {
967            for hook in hooks {
968                let result = sqlx::query!(
969                    r#"
970                    INSERT INTO run_outputs (run_id, phase, hook_id, config, output, success, error)
971                    VALUES ($1, 'pre', $2, $3, $4, $5, $6)
972                    "#,
973                    rid,
974                    hook.meta.id,
975                    hook.meta.config,
976                    hook.output,
977                    hook.meta.success,
978                    hook.meta.error
979                )
980                .execute(&state.pool)
981                .await;
982
983                match result {
984                    Ok(_) => {
985                        pre_run_count += 1;
986                        info!("Stored pre-run hook: {}", hook.meta.id);
987                    }
988                    Err(e) => {
989                        error!("Failed to store pre-run hook {}: {}", hook.meta.id, e);
990                        return Json(json!({
991                            "status": "error",
992                            "error": format!("Failed to store pre-run hook {}: {}", hook.meta.id, e)
993                        }));
994                    }
995                }
996            }
997        }
998
999        if let Some(hooks) = post_run_hooks {
1000            for hook in hooks {
1001                let result = sqlx::query!(
1002                    r#"
1003                    INSERT INTO run_outputs (run_id, phase, hook_id, config, output, success, error)
1004                    VALUES ($1, 'post', $2, $3, $4, $5, $6)
1005                    "#,
1006                    rid,
1007                    hook.meta.id,
1008                    hook.meta.config,
1009                    hook.output,
1010                    hook.meta.success,
1011                    hook.meta.error
1012                )
1013                .execute(&state.pool)
1014                .await;
1015
1016                match result {
1017                    Ok(_) => {
1018                        post_run_count += 1;
1019                        info!("Stored post-run hook: {}", hook.meta.id);
1020                    }
1021                    Err(e) => {
1022                        error!("Failed to store post-run hook {}: {}", hook.meta.id, e);
1023                        return Json(json!({
1024                            "status": "error",
1025                            "error": format!("Failed to store post-run hook {}: {}", hook.meta.id, e)
1026                        }));
1027                    }
1028                }
1029            }
1030        }
1031    }
1032
1033    info!(
1034        "Hook outputs stored: {} pre-run, {} post-run",
1035        pre_run_count, post_run_count
1036    );
1037
1038    let response = capsula_api_types::UploadResponse {
1039        status: "ok".to_string(),
1040        files_processed,
1041        total_bytes,
1042        pre_run_hooks: pre_run_count,
1043        post_run_hooks: post_run_count,
1044    };
1045    Json(serde_json::to_value(response).expect("Failed to serialize UploadResponse"))
1046}
1047
1048async fn download_file(
1049    State(state): State<AppState>,
1050    Path((run_id, file_path)): Path<(String, String)>,
1051) -> Result<Response, StatusCode> {
1052    info!("Downloading file: run_id={}, path={}", run_id, file_path);
1053
1054    // Query the database for file metadata
1055    let file_record = sqlx::query!(
1056        r#"
1057        SELECT storage_path, content_type, path as file_path
1058        FROM captured_files
1059        WHERE run_id = $1 AND path = $2
1060        "#,
1061        run_id,
1062        file_path
1063    )
1064    .fetch_optional(&state.pool)
1065    .await
1066    .map_err(|e| {
1067        error!("Database error while fetching file metadata: {}", e);
1068        StatusCode::INTERNAL_SERVER_ERROR
1069    })?;
1070
1071    let Some(record) = file_record else {
1072        info!("File not found: run_id={}, path={}", run_id, file_path);
1073        return Err(StatusCode::NOT_FOUND);
1074    };
1075
1076    // Read the file from storage
1077    let file_data = tokio::fs::read(&record.storage_path).await.map_err(|e| {
1078        error!(
1079            "Failed to read file from storage {}: {}",
1080            record.storage_path, e
1081        );
1082        StatusCode::INTERNAL_SERVER_ERROR
1083    })?;
1084
1085    info!(
1086        "Successfully read file: {} bytes from {}",
1087        file_data.len(),
1088        record.storage_path
1089    );
1090
1091    // Determine content type (use stored value or guess from filename)
1092    let content_type = record.content_type.unwrap_or_else(|| {
1093        mime_guess::from_path(&record.file_path)
1094            .first_or_octet_stream()
1095            .to_string()
1096    });
1097
1098    // Extract filename from path for Content-Disposition
1099    let filename = std::path::Path::new(&record.file_path)
1100        .file_name()
1101        .and_then(|n| n.to_str())
1102        .unwrap_or("download");
1103
1104    // Build response with appropriate headers
1105    Response::builder()
1106        .status(StatusCode::OK)
1107        .header(header::CONTENT_TYPE, content_type)
1108        .header(
1109            header::CONTENT_DISPOSITION,
1110            format!("inline; filename=\"{filename}\""),
1111        )
1112        .body(Body::from(file_data))
1113        .map_err(|e| {
1114            error!("Failed to build response: {}", e);
1115            StatusCode::INTERNAL_SERVER_ERROR
1116        })
1117}