capsula_server/
lib.rs

1use askama::Template;
2use askama_web::WebTemplate;
3
4#[expect(
5    clippy::unnecessary_wraps,
6    reason = "Askama filter_fn macro generates code that triggers this lint, but Result return type is required by the Askama filter API"
7)]
8mod filters {
9    use askama::filter_fn;
10
11    #[filter_fn]
12    pub fn format_command(s: &str, _env: &dyn askama::Values) -> ::askama::Result<String> {
13        // Try to parse as JSON array
14        Ok(serde_json::from_str::<Vec<String>>(s).map_or_else(
15            |_| s.to_string(),
16            |cmd_array| {
17                shlex::try_join(cmd_array.iter().map(String::as_str))
18                    .unwrap_or_else(|_| cmd_array.join(" "))
19            },
20        ))
21    }
22
23    #[filter_fn]
24    pub fn pretty_json<T: std::fmt::Display>(
25        s: T,
26        _env: &dyn askama::Values,
27    ) -> ::askama::Result<String> {
28        let s_str = s.to_string();
29        // Try to parse as JSON and pretty-print with 2-space indentation
30        Ok(serde_json::from_str::<serde_json::Value>(&s_str)
31            .ok()
32            .and_then(|value| serde_json::to_string_pretty(&value).ok())
33            .unwrap_or(s_str))
34    }
35}
36
37use axum::{
38    Router,
39    body::Body,
40    extract::{Multipart, Path, Query, State},
41    http::{StatusCode, header},
42    response::{IntoResponse, Json, Response},
43    routing::{get, post},
44};
45use capsula_api_types::VaultInfo;
46use serde_json::json;
47use sha2::{Digest, Sha256};
48use sqlx::PgPool;
49use std::collections::VecDeque;
50use std::path::PathBuf;
51use tower_http::services::ServeDir;
52use tracing::{error, info, warn};
53
54#[derive(Clone)]
55pub struct AppState {
56    pub pool: PgPool,
57    pub storage_path: PathBuf,
58}
59
60mod models;
61
62#[derive(Template, WebTemplate)]
63#[template(path = "index.html")]
64struct IndexTemplate;
65
66#[derive(Template, WebTemplate)]
67#[template(path = "vaults.html")]
68struct VaultsTemplate {
69    vaults: Vec<VaultInfo>,
70}
71
72#[derive(Template, WebTemplate)]
73#[template(path = "runs.html")]
74struct RunsTemplate {
75    runs: Vec<models::Run>,
76    vault: Option<String>,
77    page: i64,
78    total_pages: i64,
79}
80
81#[derive(Template, WebTemplate)]
82#[template(path = "run_detail.html")]
83struct RunDetailTemplate {
84    run: models::Run,
85    pre_run_hooks: Vec<models::HookOutput>,
86    post_run_hooks: Vec<models::HookOutput>,
87    files: Vec<models::CapturedFile>,
88}
89
90#[derive(Template, WebTemplate)]
91#[template(path = "error.html")]
92struct ErrorTemplate {
93    status_code: u16,
94    title: String,
95    message: String,
96}
97
98pub async fn create_pool(database_url: &str, max_connections: u32) -> Result<PgPool, sqlx::Error> {
99    sqlx::postgres::PgPoolOptions::new()
100        .max_connections(max_connections)
101        .acquire_timeout(std::time::Duration::from_secs(3))
102        .connect(database_url)
103        .await
104}
105
106pub fn build_app(pool: PgPool, storage_path: PathBuf) -> Router {
107    let static_dir: PathBuf = std::env::var("CAPSULA_STATIC_DIR")
108        .expect("CAPSULA_STATIC_DIR environment variable must be set")
109        .into();
110
111    let state = AppState { pool, storage_path };
112
113    Router::new()
114        .route("/", get(index))
115        .route("/vaults", get(vaults_page))
116        .route("/runs", get(runs_page))
117        .route("/runs/{id}", get(run_detail_page))
118        .route("/health", get(health_check))
119        .route("/api/v1/vaults", get(list_vaults))
120        .route("/api/v1/vaults/{name}", get(get_vault_info))
121        .route("/api/v1/runs", post(create_run).get(list_runs))
122        .route("/api/v1/runs/{id}", get(get_run))
123        .route("/api/v1/runs/{id}/files/{*path}", get(download_file))
124        .route("/api/v1/upload", post(upload_files))
125        .nest_service("/static", ServeDir::new(static_dir))
126        .fallback(not_found)
127        .with_state(state)
128}
129
130async fn index() -> impl IntoResponse {
131    IndexTemplate
132}
133
134async fn not_found() -> impl IntoResponse {
135    (
136        StatusCode::NOT_FOUND,
137        ErrorTemplate {
138            status_code: 404,
139            title: "Page Not Found".to_string(),
140            message: "The page you are looking for does not exist.".to_string(),
141        },
142    )
143}
144
145async fn vaults_page(State(state): State<AppState>) -> impl IntoResponse {
146    info!("Rendering vaults page");
147
148    let result = sqlx::query!(
149        r#"
150        SELECT vault as name, COUNT(*) as "run_count!"
151        FROM runs
152        GROUP BY vault
153        ORDER BY vault
154        "#
155    )
156    .fetch_all(&state.pool)
157    .await;
158
159    match result {
160        Ok(rows) => {
161            let vaults: Vec<VaultInfo> = rows
162                .into_iter()
163                .map(|row| VaultInfo {
164                    name: row.name,
165                    run_count: row.run_count,
166                })
167                .collect();
168            VaultsTemplate { vaults }
169        }
170        Err(e) => {
171            error!("Failed to fetch vaults: {}", e);
172            // Return empty vaults on error
173            VaultsTemplate { vaults: Vec::new() }
174        }
175    }
176}
177
178async fn runs_page(
179    State(state): State<AppState>,
180    Query(params): Query<models::ListRunsQuery>,
181) -> impl IntoResponse {
182    let page = params.offset.unwrap_or(0) / params.limit.unwrap_or(50) + 1;
183    let limit = params.limit.unwrap_or(50);
184    let offset = (page - 1) * limit;
185
186    info!(
187        "Rendering runs page: page={}, vault={:?}",
188        page, params.vault
189    );
190
191    // Get total count for pagination
192    let total_count = if let Some(ref vault) = params.vault {
193        sqlx::query_scalar!(
194            r#"
195            SELECT COUNT(*)::bigint
196            FROM runs
197            WHERE vault = $1
198            "#,
199            vault
200        )
201        .fetch_one(&state.pool)
202        .await
203        .unwrap_or(Some(0))
204        .unwrap_or(0)
205    } else {
206        sqlx::query_scalar!(
207            r#"
208            SELECT COUNT(*)::bigint
209            FROM runs
210            "#
211        )
212        .fetch_one(&state.pool)
213        .await
214        .unwrap_or(Some(0))
215        .unwrap_or(0)
216    };
217
218    let total_pages = (total_count + limit - 1) / limit;
219
220    // Fetch runs
221    let runs_result = if let Some(ref vault) = params.vault {
222        sqlx::query_as!(
223            models::Run,
224            r#"
225            SELECT id, name, timestamp, command, vault, project_root,
226                   exit_code, duration_ms, stdout, stderr,
227                   created_at, updated_at
228            FROM runs
229            WHERE vault = $1
230            ORDER BY timestamp DESC
231            LIMIT $2 OFFSET $3
232            "#,
233            vault,
234            limit,
235            offset
236        )
237        .fetch_all(&state.pool)
238        .await
239    } else {
240        sqlx::query_as!(
241            models::Run,
242            r#"
243            SELECT id, name, timestamp, command, vault, project_root,
244                   exit_code, duration_ms, stdout, stderr,
245                   created_at, updated_at
246            FROM runs
247            ORDER BY timestamp DESC
248            LIMIT $1 OFFSET $2
249            "#,
250            limit,
251            offset
252        )
253        .fetch_all(&state.pool)
254        .await
255    };
256
257    match runs_result {
258        Ok(runs) => RunsTemplate {
259            runs,
260            vault: params.vault,
261            page,
262            total_pages,
263        },
264        Err(e) => {
265            error!("Failed to fetch runs: {}", e);
266            RunsTemplate {
267                runs: Vec::new(),
268                vault: params.vault,
269                page: 1,
270                total_pages: 1,
271            }
272        }
273    }
274}
275
276async fn run_detail_page(
277    State(state): State<AppState>,
278    Path(id): Path<String>,
279) -> Result<RunDetailTemplate, StatusCode> {
280    info!("Rendering run detail page for: {}", id);
281
282    // Fetch run metadata
283    let run = sqlx::query_as!(
284        models::Run,
285        r#"
286        SELECT id, name, timestamp, command, vault, project_root,
287               exit_code, duration_ms, stdout, stderr,
288               created_at, updated_at
289        FROM runs
290        WHERE id = $1
291        "#,
292        id
293    )
294    .fetch_optional(&state.pool)
295    .await
296    .map_err(|e| {
297        error!("Database error while fetching run: {}", e);
298        StatusCode::INTERNAL_SERVER_ERROR
299    })?
300    .ok_or_else(|| {
301        info!("Run not found: {}", id);
302        StatusCode::NOT_FOUND
303    })?;
304
305    // Fetch hook outputs
306    let hook_outputs_result = sqlx::query_as!(
307        models::RunOutputRow,
308        r#"
309        SELECT phase, hook_id, config, output, success, error
310        FROM run_outputs
311        WHERE run_id = $1
312        ORDER BY id
313        "#,
314        id
315    )
316    .fetch_all(&state.pool)
317    .await;
318
319    let (pre_run_hooks, post_run_hooks) = match hook_outputs_result {
320        Ok(rows) => {
321            let mut pre_hooks = Vec::new();
322            let mut post_hooks = Vec::new();
323
324            for row in rows {
325                let hook_output = models::HookOutput {
326                    meta: models::HookMeta {
327                        id: row.hook_id,
328                        config: row.config,
329                        success: row.success,
330                        error: row.error,
331                    },
332                    output: row.output,
333                };
334
335                if row.phase == "pre" {
336                    pre_hooks.push(hook_output);
337                } else if row.phase == "post" {
338                    post_hooks.push(hook_output);
339                } else {
340                    warn!("Unknown hook phase: {}", row.phase);
341                }
342            }
343
344            (pre_hooks, post_hooks)
345        }
346        Err(e) => {
347            error!("Failed to fetch hook outputs: {}", e);
348            (Vec::new(), Vec::new())
349        }
350    };
351
352    // Fetch captured files
353    let files_result = sqlx::query_as!(
354        models::CapturedFile,
355        r#"
356        SELECT path, size, hash, storage_path, content_type
357        FROM captured_files
358        WHERE run_id = $1
359        ORDER BY path
360        "#,
361        id
362    )
363    .fetch_all(&state.pool)
364    .await;
365
366    let files = match files_result {
367        Ok(files) => files,
368        Err(e) => {
369            error!("Failed to fetch captured files: {}", e);
370            Vec::new()
371        }
372    };
373
374    Ok(RunDetailTemplate {
375        run,
376        pre_run_hooks,
377        post_run_hooks,
378        files,
379    })
380}
381
382async fn health_check(State(state): State<AppState>) -> impl IntoResponse {
383    match sqlx::query("SELECT 1").fetch_one(&state.pool).await {
384        Ok(_) => Json(json!({
385            "status": "ok",
386            "database": "connected"
387        })),
388        Err(e) => Json(json!({
389            "status": "error",
390            "database": "disconnected",
391            "error": e.to_string()
392        })),
393    }
394}
395
396async fn list_vaults(State(state): State<AppState>) -> impl IntoResponse {
397    info!("Listing all vaults");
398
399    let result = sqlx::query!(
400        r#"
401        SELECT vault as name, COUNT(*) as "run_count!"
402        FROM runs
403        GROUP BY vault
404        ORDER BY vault
405        "#
406    )
407    .fetch_all(&state.pool)
408    .await;
409
410    match result {
411        Ok(rows) => {
412            let vaults: Vec<VaultInfo> = rows
413                .into_iter()
414                .map(|row| VaultInfo {
415                    name: row.name,
416                    run_count: row.run_count,
417                })
418                .collect();
419            info!("Found {} vaults", vaults.len());
420            let response = capsula_api_types::VaultsResponse {
421                status: "ok".to_string(),
422                vaults,
423            };
424            Json(serde_json::to_value(response).expect("Failed to serialize VaultsResponse"))
425        }
426        Err(e) => {
427            error!("Failed to list vaults: {}", e);
428            let response = capsula_api_types::ErrorResponse {
429                status: "error".to_string(),
430                error: e.to_string(),
431            };
432            Json(serde_json::to_value(response).expect("Failed to serialize ErrorResponse"))
433        }
434    }
435}
436
437async fn get_vault_info(
438    State(state): State<AppState>,
439    Path(name): Path<String>,
440) -> impl IntoResponse {
441    info!("Getting vault info: {}", name);
442
443    let result = sqlx::query!(
444        r#"
445        SELECT vault as name, COUNT(*) as "run_count!"
446        FROM runs
447        WHERE vault = $1
448        GROUP BY vault
449        "#,
450        name
451    )
452    .fetch_optional(&state.pool)
453    .await;
454
455    match result {
456        Ok(Some(row)) => {
457            let vault = VaultInfo {
458                name: row.name,
459                run_count: row.run_count,
460            };
461            info!("Found vault: {} with {} runs", vault.name, vault.run_count);
462            let response = capsula_api_types::VaultExistsResponse {
463                status: "ok".to_string(),
464                exists: true,
465                vault: Some(vault),
466            };
467            Json(serde_json::to_value(response).expect("Failed to serialize VaultExistsResponse"))
468        }
469        Ok(None) => {
470            info!("Vault not found: {}", name);
471            let response = capsula_api_types::VaultExistsResponse {
472                status: "ok".to_string(),
473                exists: false,
474                vault: None,
475            };
476            Json(serde_json::to_value(response).expect("Failed to serialize VaultExistsResponse"))
477        }
478        Err(e) => {
479            error!("Failed to get vault info: {}", e);
480            let response = capsula_api_types::ErrorResponse {
481                status: "error".to_string(),
482                error: e.to_string(),
483            };
484            Json(serde_json::to_value(response).expect("Failed to serialize ErrorResponse"))
485        }
486    }
487}
488
489async fn list_runs(
490    State(state): State<AppState>,
491    Query(params): Query<models::ListRunsQuery>,
492) -> impl IntoResponse {
493    let limit = params.limit.unwrap_or(100);
494    let offset = params.offset.unwrap_or(0);
495
496    if let Some(ref vault) = params.vault {
497        info!(
498            "Listing runs for vault: {} (limit={}, offset={})",
499            vault, limit, offset
500        );
501    } else {
502        info!("Listing all runs (limit={}, offset={})", limit, offset);
503    }
504
505    let result = if let Some(vault) = params.vault {
506        sqlx::query_as!(
507            models::Run,
508            r#"
509            SELECT id, name, timestamp, command, vault, project_root,
510                   exit_code, duration_ms, stdout, stderr,
511                   created_at, updated_at
512            FROM runs
513            WHERE vault = $1
514            ORDER BY timestamp DESC
515            LIMIT $2 OFFSET $3
516            "#,
517            vault,
518            limit,
519            offset
520        )
521        .fetch_all(&state.pool)
522        .await
523    } else {
524        sqlx::query_as!(
525            models::Run,
526            r#"
527            SELECT id, name, timestamp, command, vault, project_root,
528                   exit_code, duration_ms, stdout, stderr,
529                   created_at, updated_at
530            FROM runs
531            ORDER BY timestamp DESC
532            LIMIT $1 OFFSET $2
533            "#,
534            limit,
535            offset
536        )
537        .fetch_all(&state.pool)
538        .await
539    };
540
541    match result {
542        Ok(runs) => {
543            info!("Found {} runs", runs.len());
544            Json(json!({
545                "status": "ok",
546                "runs": runs,
547                "limit": limit,
548                "offset": offset
549            }))
550        }
551        Err(e) => {
552            error!("Failed to list runs: {}", e);
553            Json(json!({
554                "status": "error",
555                "error": e.to_string()
556            }))
557        }
558    }
559}
560
561async fn create_run(
562    State(state): State<AppState>,
563    Json(request): Json<models::CreateRunRequest>,
564) -> impl IntoResponse {
565    info!("Creating run: {}", request.id);
566
567    let result = sqlx::query!(
568        r#"
569        INSERT INTO runs (
570            id, name, timestamp, command, vault, project_root,
571            exit_code, duration_ms, stdout, stderr
572        ) VALUES (
573            $1, $2, $3, $4, $5, $6,
574            $7, $8, $9, $10
575        )
576        "#,
577        request.id,
578        request.name,
579        request.timestamp,
580        request.command,
581        request.vault,
582        request.project_root,
583        request.exit_code,
584        request.duration_ms,
585        request.stdout,
586        request.stderr
587    )
588    .execute(&state.pool)
589    .await;
590
591    match result {
592        Ok(_) => {
593            info!("Run created successfully");
594            (
595                StatusCode::CREATED,
596                Json(json!({
597                    "status": "created",
598                    "run": request
599                })),
600            )
601        }
602        Err(e) => {
603            error!("Failed to insert run: {}", e);
604            let status = if e.to_string().contains("duplicate key") {
605                StatusCode::CONFLICT
606            } else {
607                StatusCode::INTERNAL_SERVER_ERROR
608            };
609            (
610                status,
611                Json(json!({
612                    "status": "error",
613                    "error": e.to_string()
614                })),
615            )
616        }
617    }
618}
619
620async fn get_run(State(state): State<AppState>, Path(id): Path<String>) -> impl IntoResponse {
621    info!("Getting run: {}", id);
622
623    let result = sqlx::query_as!(
624        models::Run,
625        r#"
626        SELECT id, name, timestamp, command, vault, project_root,
627               exit_code, duration_ms, stdout, stderr,
628               created_at, updated_at
629        FROM runs
630        WHERE id = $1
631        "#,
632        id
633    )
634    .fetch_optional(&state.pool)
635    .await;
636
637    match result {
638        Ok(Some(run)) => {
639            info!("Found run: {}", run.id);
640
641            // Fetch hook outputs
642            let hook_outputs_result = sqlx::query_as!(
643                models::RunOutputRow,
644                r#"
645                SELECT phase, hook_id, config, output, success, error
646                FROM run_outputs
647                WHERE run_id = $1
648                ORDER BY id
649                "#,
650                id
651            )
652            .fetch_all(&state.pool)
653            .await;
654
655            let (pre_run_hooks, post_run_hooks) = match hook_outputs_result {
656                Ok(rows) => {
657                    let mut pre_hooks = Vec::new();
658                    let mut post_hooks = Vec::new();
659
660                    for row in rows {
661                        let hook_output = models::HookOutput {
662                            meta: models::HookMeta {
663                                id: row.hook_id,
664                                config: row.config,
665                                success: row.success,
666                                error: row.error,
667                            },
668                            output: row.output,
669                        };
670
671                        if row.phase == "pre" {
672                            pre_hooks.push(hook_output);
673                        } else if row.phase == "post" {
674                            post_hooks.push(hook_output);
675                        } else {
676                            warn!("Unknown hook phase: {}", row.phase);
677                        }
678                    }
679
680                    info!(
681                        "Found {} pre-run hooks and {} post-run hooks",
682                        pre_hooks.len(),
683                        post_hooks.len()
684                    );
685                    (pre_hooks, post_hooks)
686                }
687                Err(e) => {
688                    error!("Failed to fetch hook outputs: {}", e);
689                    (Vec::new(), Vec::new())
690                }
691            };
692
693            Json(json!({
694                "status": "ok",
695                "run": run,
696                "pre_run_hooks": pre_run_hooks,
697                "post_run_hooks": post_run_hooks
698            }))
699        }
700        Ok(None) => {
701            info!("Run not found: {}", id);
702            Json(json!({
703                "status": "not_found",
704                "error": format!("Run with id {} not found", id)
705            }))
706        }
707        Err(e) => {
708            error!("Failed to retrieve run: {}", e);
709            Json(json!({
710                "status": "error",
711                "error": e.to_string()
712            }))
713        }
714    }
715}
716
717#[expect(clippy::too_many_lines, reason = "TODO: Refactor later")]
718#[expect(
719    clippy::else_if_without_else,
720    reason = "There is `continue` or `return` in each branch, so `else` is redundant"
721)]
722async fn upload_files(
723    State(state): State<AppState>,
724    mut multipart: Multipart,
725) -> impl IntoResponse {
726    let storage_path = &state.storage_path;
727    info!("Received file upload request");
728
729    if let Err(e) = tokio::fs::create_dir_all(&storage_path).await {
730        error!("Failed to create storage directory: {}", e);
731        return Json(json!({
732            "status": "error",
733            "error": format!("Failed to create storage directory: {}", e)
734        }));
735    }
736
737    let mut files_processed = 0;
738    let mut total_bytes = 0u64;
739    let mut run_id: Option<String> = None;
740    let mut pending_paths: VecDeque<String> = VecDeque::new();
741    let mut pre_run_hooks: Option<Vec<models::HookOutput>> = None;
742    let mut post_run_hooks: Option<Vec<models::HookOutput>> = None;
743
744    while let Ok(Some(field)) = multipart.next_field().await {
745        let field_name = field.name().unwrap_or("unknown").to_string();
746
747        if field_name == "run_id" {
748            match field.text().await {
749                Ok(text) => {
750                    run_id = Some(text);
751                    continue;
752                }
753                Err(e) => {
754                    error!("Failed to read run_id field: {}", e);
755                    return Json(json!({
756                        "status": "error",
757                        "error": format!("Failed to read run_id: {}", e)
758                    }));
759                }
760            }
761        } else if field_name == "path" {
762            match field.text().await {
763                Ok(text) => {
764                    pending_paths.push_back(text);
765                    continue;
766                }
767                Err(e) => {
768                    error!("Failed to read path field: {}", e);
769                    return Json(json!({
770                        "status": "error",
771                        "error": format!("Failed to read path: {}", e)
772                    }));
773                }
774            }
775        } else if field_name == "pre_run" {
776            match field.text().await {
777                Ok(text) => match serde_json::from_str::<Vec<models::HookOutput>>(&text) {
778                    Ok(hooks) => {
779                        info!("Parsed {} pre-run hooks", hooks.len());
780                        pre_run_hooks = Some(hooks);
781                        continue;
782                    }
783                    Err(e) => {
784                        error!("Failed to parse pre_run JSON: {}", e);
785                        return Json(json!({
786                            "status": "error",
787                            "error": format!("Failed to parse pre_run JSON: {}", e)
788                        }));
789                    }
790                },
791                Err(e) => {
792                    error!("Failed to read pre_run field: {}", e);
793                    return Json(json!({
794                        "status": "error",
795                        "error": format!("Failed to read pre_run: {}", e)
796                    }));
797                }
798            }
799        } else if field_name == "post_run" {
800            match field.text().await {
801                Ok(text) => match serde_json::from_str::<Vec<models::HookOutput>>(&text) {
802                    Ok(hooks) => {
803                        info!("Parsed {} post-run hooks", hooks.len());
804                        post_run_hooks = Some(hooks);
805                        continue;
806                    }
807                    Err(e) => {
808                        error!("Failed to parse post_run JSON: {}", e);
809                        return Json(json!({
810                            "status": "error",
811                            "error": format!("Failed to parse post_run JSON: {}", e)
812                        }));
813                    }
814                },
815                Err(e) => {
816                    error!("Failed to read post_run field: {}", e);
817                    return Json(json!({
818                        "status": "error",
819                        "error": format!("Failed to read post_run: {}", e)
820                    }));
821                }
822            }
823        }
824
825        let file_name = field.file_name().unwrap_or("unknown").to_string();
826        let content_type = field
827            .content_type()
828            .unwrap_or("application/octet-stream")
829            .to_string();
830
831        info!(
832            "Processing file: field_name={}, file_name={}, content_type={}",
833            field_name, file_name, content_type
834        );
835
836        match field.bytes().await {
837            Ok(data) => {
838                let size = data.len();
839                total_bytes += size as u64;
840                let Ok(size_i64) = i64::try_from(size) else {
841                    error!("File too large to store size in database: {} bytes", size);
842                    return Json(json!({
843                        "status": "error",
844                        "error": "File too large to store size in database"
845                    }));
846                };
847
848                let mut hasher = Sha256::new();
849                hasher.update(&data);
850                let hash = format!("{:x}", hasher.finalize());
851
852                info!("File hash: {}, size: {} bytes", hash, size);
853
854                let hash_dir = &hash[0..2];
855                let file_storage_dir = storage_path.join(hash_dir);
856                if let Err(e) = tokio::fs::create_dir_all(&file_storage_dir).await {
857                    error!("Failed to create hash directory: {}", e);
858                    return Json(json!({
859                        "status": "error",
860                        "error": format!("Failed to create storage directory: {}", e)
861                    }));
862                }
863
864                let file_storage_path = file_storage_dir.join(&hash);
865                let storage_path_str = file_storage_path.to_string_lossy().to_string();
866
867                if file_storage_path.exists() {
868                    info!("File already exists (deduplicated): {}", storage_path_str);
869                } else {
870                    if let Err(e) = tokio::fs::write(&file_storage_path, &data).await {
871                        error!("Failed to write file: {}", e);
872                        return Json(json!({
873                            "status": "error",
874                            "error": format!("Failed to write file: {}", e)
875                        }));
876                    }
877                    info!("Saved new file: {}", storage_path_str);
878                }
879
880                if let Some(ref rid) = run_id {
881                    let relative_path = pending_paths
882                        .pop_front()
883                        .or_else(|| {
884                            if file_name == "unknown" {
885                                None
886                            } else {
887                                Some(file_name.clone())
888                            }
889                        })
890                        .unwrap_or_else(|| {
891                            if field_name != "unknown" && field_name != "file" {
892                                field_name.clone()
893                            } else {
894                                format!("file-{}", files_processed + 1)
895                            }
896                        });
897
898                    let result = sqlx::query!(
899                        r#"
900                        INSERT INTO captured_files (run_id, path, size, hash, storage_path, content_type)
901                        VALUES ($1, $2, $3, $4, $5, $6)
902                        ON CONFLICT (run_id, path) DO UPDATE
903                        SET size = EXCLUDED.size,
904                            hash = EXCLUDED.hash,
905                            storage_path = EXCLUDED.storage_path,
906                            content_type = EXCLUDED.content_type
907                        "#,
908                        rid,
909                        relative_path,
910                        size_i64,
911                        hash,
912                        storage_path_str,
913                        content_type
914                    )
915                    .execute(&state.pool)
916                    .await;
917
918                    match result {
919                        Ok(_) => {
920                            info!("Stored file metadata in database");
921                        }
922                        Err(e) => {
923                            error!("Failed to store file metadata: {}", e);
924                            return Json(json!({
925                                "status": "error",
926                                "error": format!("Failed to store file metadata: {}", e)
927                            }));
928                        }
929                    }
930                }
931
932                files_processed += 1;
933                info!("Successfully processed file: {} bytes", size);
934            }
935            Err(e) => {
936                error!("Failed to read file field: {}", e);
937                return Json(json!({
938                    "status": "error",
939                    "error": format!("Failed to read file: {}", e)
940                }));
941            }
942        }
943    }
944
945    info!(
946        "Upload complete: {} files, {} bytes total",
947        files_processed, total_bytes
948    );
949
950    // Store hook outputs if provided
951    let mut pre_run_count = 0;
952    let mut post_run_count = 0;
953
954    if let Some(ref rid) = run_id {
955        if let Some(hooks) = pre_run_hooks {
956            for hook in hooks {
957                let result = sqlx::query!(
958                    r#"
959                    INSERT INTO run_outputs (run_id, phase, hook_id, config, output, success, error)
960                    VALUES ($1, 'pre', $2, $3, $4, $5, $6)
961                    "#,
962                    rid,
963                    hook.meta.id,
964                    hook.meta.config,
965                    hook.output,
966                    hook.meta.success,
967                    hook.meta.error
968                )
969                .execute(&state.pool)
970                .await;
971
972                match result {
973                    Ok(_) => {
974                        pre_run_count += 1;
975                        info!("Stored pre-run hook: {}", hook.meta.id);
976                    }
977                    Err(e) => {
978                        error!("Failed to store pre-run hook {}: {}", hook.meta.id, e);
979                        return Json(json!({
980                            "status": "error",
981                            "error": format!("Failed to store pre-run hook {}: {}", hook.meta.id, e)
982                        }));
983                    }
984                }
985            }
986        }
987
988        if let Some(hooks) = post_run_hooks {
989            for hook in hooks {
990                let result = sqlx::query!(
991                    r#"
992                    INSERT INTO run_outputs (run_id, phase, hook_id, config, output, success, error)
993                    VALUES ($1, 'post', $2, $3, $4, $5, $6)
994                    "#,
995                    rid,
996                    hook.meta.id,
997                    hook.meta.config,
998                    hook.output,
999                    hook.meta.success,
1000                    hook.meta.error
1001                )
1002                .execute(&state.pool)
1003                .await;
1004
1005                match result {
1006                    Ok(_) => {
1007                        post_run_count += 1;
1008                        info!("Stored post-run hook: {}", hook.meta.id);
1009                    }
1010                    Err(e) => {
1011                        error!("Failed to store post-run hook {}: {}", hook.meta.id, e);
1012                        return Json(json!({
1013                            "status": "error",
1014                            "error": format!("Failed to store post-run hook {}: {}", hook.meta.id, e)
1015                        }));
1016                    }
1017                }
1018            }
1019        }
1020    }
1021
1022    info!(
1023        "Hook outputs stored: {} pre-run, {} post-run",
1024        pre_run_count, post_run_count
1025    );
1026
1027    let response = capsula_api_types::UploadResponse {
1028        status: "ok".to_string(),
1029        files_processed,
1030        total_bytes,
1031        pre_run_hooks: pre_run_count,
1032        post_run_hooks: post_run_count,
1033    };
1034    Json(serde_json::to_value(response).expect("Failed to serialize UploadResponse"))
1035}
1036
1037async fn download_file(
1038    State(state): State<AppState>,
1039    Path((run_id, file_path)): Path<(String, String)>,
1040) -> Result<Response, StatusCode> {
1041    info!("Downloading file: run_id={}, path={}", run_id, file_path);
1042
1043    // Query the database for file metadata
1044    let file_record = sqlx::query!(
1045        r#"
1046        SELECT storage_path, content_type, path as file_path
1047        FROM captured_files
1048        WHERE run_id = $1 AND path = $2
1049        "#,
1050        run_id,
1051        file_path
1052    )
1053    .fetch_optional(&state.pool)
1054    .await
1055    .map_err(|e| {
1056        error!("Database error while fetching file metadata: {}", e);
1057        StatusCode::INTERNAL_SERVER_ERROR
1058    })?;
1059
1060    let Some(record) = file_record else {
1061        info!("File not found: run_id={}, path={}", run_id, file_path);
1062        return Err(StatusCode::NOT_FOUND);
1063    };
1064
1065    // Read the file from storage
1066    let file_data = tokio::fs::read(&record.storage_path).await.map_err(|e| {
1067        error!(
1068            "Failed to read file from storage {}: {}",
1069            record.storage_path, e
1070        );
1071        StatusCode::INTERNAL_SERVER_ERROR
1072    })?;
1073
1074    info!(
1075        "Successfully read file: {} bytes from {}",
1076        file_data.len(),
1077        record.storage_path
1078    );
1079
1080    // Determine content type (use stored value or guess from filename)
1081    let content_type = record.content_type.unwrap_or_else(|| {
1082        mime_guess::from_path(&record.file_path)
1083            .first_or_octet_stream()
1084            .to_string()
1085    });
1086
1087    // Extract filename from path for Content-Disposition
1088    let filename = std::path::Path::new(&record.file_path)
1089        .file_name()
1090        .and_then(|n| n.to_str())
1091        .unwrap_or("download");
1092
1093    // Build response with appropriate headers
1094    Response::builder()
1095        .status(StatusCode::OK)
1096        .header(header::CONTENT_TYPE, content_type)
1097        .header(
1098            header::CONTENT_DISPOSITION,
1099            format!("inline; filename=\"{filename}\""),
1100        )
1101        .body(Body::from(file_data))
1102        .map_err(|e| {
1103            error!("Failed to build response: {}", e);
1104            StatusCode::INTERNAL_SERVER_ERROR
1105        })
1106}