Skip to main content

capsula_server/
lib.rs

1use askama::Template;
2use askama_web::WebTemplate;
3
4#[expect(
5    clippy::unnecessary_wraps,
6    reason = "Askama filter_fn macro generates code that triggers this lint, but Result return type is required by the Askama filter API"
7)]
8#[expect(
9    clippy::inline_always,
10    clippy::unused_self,
11    reason = "Askama's #[filter_fn] macro generates builder pattern code with #[inline(always)] that triggers these lints"
12)]
13mod filters {
14    use askama::filter_fn;
15
16    #[filter_fn]
17    pub fn format_command(s: &str, _env: &dyn askama::Values) -> ::askama::Result<String> {
18        // Try to parse as JSON array
19        Ok(serde_json::from_str::<Vec<String>>(s).map_or_else(
20            |_| s.to_string(),
21            |cmd_array| {
22                shlex::try_join(cmd_array.iter().map(String::as_str))
23                    .unwrap_or_else(|_| cmd_array.join(" "))
24            },
25        ))
26    }
27
28    #[filter_fn]
29    pub fn pretty_json<T: std::fmt::Display>(
30        s: T,
31        _env: &dyn askama::Values,
32    ) -> ::askama::Result<String> {
33        let s_str = s.to_string();
34        // Try to parse as JSON and pretty-print with 2-space indentation
35        Ok(serde_json::from_str::<serde_json::Value>(&s_str)
36            .ok()
37            .and_then(|value| serde_json::to_string_pretty(&value).ok())
38            .unwrap_or(s_str))
39    }
40}
41
42use axum::{
43    Router,
44    body::Body,
45    extract::{DefaultBodyLimit, Multipart, Path, Query, State},
46    http::{StatusCode, header},
47    response::{IntoResponse, Json, Response},
48    routing::{get, post},
49};
50use capsula_api_types::VaultInfo;
51use serde_json::json;
52use sha2::{Digest, Sha256};
53use sqlx::PgPool;
54use std::collections::VecDeque;
55use std::path::PathBuf;
56use tower_http::services::ServeDir;
57use tracing::{error, info, warn};
58
59#[derive(Clone)]
60pub struct AppState {
61    pub pool: PgPool,
62    pub storage_path: PathBuf,
63}
64
65mod models;
66mod query;
67
68#[derive(Template, WebTemplate)]
69#[template(path = "index.html")]
70struct IndexTemplate;
71
72#[derive(Template, WebTemplate)]
73#[template(path = "vaults.html")]
74struct VaultsTemplate {
75    vaults: Vec<VaultInfo>,
76}
77
78#[derive(Template, WebTemplate)]
79#[template(path = "runs.html")]
80struct RunsTemplate {
81    runs: Vec<models::Run>,
82    vault: Option<String>,
83    page: i64,
84    total_pages: i64,
85}
86
87#[derive(Template, WebTemplate)]
88#[template(path = "run_detail.html")]
89struct RunDetailTemplate {
90    run: models::Run,
91    pre_run_hooks: Vec<models::HookOutput>,
92    post_run_hooks: Vec<models::HookOutput>,
93    files: Vec<models::CapturedFile>,
94}
95
96#[derive(Template, WebTemplate)]
97#[template(path = "error.html")]
98struct ErrorTemplate {
99    status_code: u16,
100    title: String,
101    message: String,
102}
103
104pub async fn create_pool(database_url: &str, max_connections: u32) -> Result<PgPool, sqlx::Error> {
105    sqlx::postgres::PgPoolOptions::new()
106        .max_connections(max_connections)
107        .acquire_timeout(std::time::Duration::from_secs(3))
108        .connect(database_url)
109        .await
110}
111
112pub fn build_app(pool: PgPool, storage_path: PathBuf, max_body_size: usize) -> Router {
113    let static_dir: PathBuf = std::env::var("CAPSULA_STATIC_DIR")
114        .expect("CAPSULA_STATIC_DIR environment variable must be set")
115        .into();
116
117    let state = AppState { pool, storage_path };
118
119    Router::new()
120        .route("/", get(index))
121        .route("/vaults", get(vaults_page))
122        .route("/runs", get(runs_page))
123        .route("/runs/{id}", get(run_detail_page))
124        .route("/health", get(health_check))
125        .route("/api/v1/vaults", get(list_vaults))
126        .route("/api/v1/vaults/{name}", get(get_vault_info))
127        .route("/api/v1/runs", post(create_run).get(list_runs))
128        .route("/api/v1/runs/search", post(search_runs))
129        .route("/api/v1/runs/{id}", get(get_run))
130        .route("/api/v1/runs/{id}/files/{*path}", get(download_file))
131        .route(
132            "/api/v1/upload",
133            post(upload_files).layer(DefaultBodyLimit::max(max_body_size)),
134        )
135        .nest_service("/static", ServeDir::new(static_dir))
136        .fallback(not_found)
137        .with_state(state)
138}
139
140async fn index() -> impl IntoResponse {
141    IndexTemplate
142}
143
144async fn not_found() -> impl IntoResponse {
145    (
146        StatusCode::NOT_FOUND,
147        ErrorTemplate {
148            status_code: 404,
149            title: "Page Not Found".to_string(),
150            message: "The page you are looking for does not exist.".to_string(),
151        },
152    )
153}
154
155async fn vaults_page(State(state): State<AppState>) -> impl IntoResponse {
156    info!("Rendering vaults page");
157
158    let result = sqlx::query!(
159        r#"
160        SELECT vault as name, COUNT(*) as "run_count!"
161        FROM runs
162        GROUP BY vault
163        ORDER BY vault
164        "#
165    )
166    .fetch_all(&state.pool)
167    .await;
168
169    match result {
170        Ok(rows) => {
171            let vaults: Vec<VaultInfo> = rows
172                .into_iter()
173                .map(|row| VaultInfo {
174                    name: row.name,
175                    run_count: row.run_count,
176                })
177                .collect();
178            VaultsTemplate { vaults }
179        }
180        Err(e) => {
181            error!("Failed to fetch vaults: {}", e);
182            // Return empty vaults on error
183            VaultsTemplate { vaults: Vec::new() }
184        }
185    }
186}
187
188async fn runs_page(
189    State(state): State<AppState>,
190    Query(params): Query<models::ListRunsQuery>,
191) -> impl IntoResponse {
192    let page = params.offset.unwrap_or(0) / params.limit.unwrap_or(50) + 1;
193    let limit = params.limit.unwrap_or(50);
194    let offset = (page - 1) * limit;
195
196    info!(
197        "Rendering runs page: page={}, vault={:?}",
198        page, params.vault
199    );
200
201    // Get total count for pagination
202    let total_count = if let Some(ref vault) = params.vault {
203        sqlx::query_scalar!(
204            r#"
205            SELECT COUNT(*)::bigint
206            FROM runs
207            WHERE vault = $1
208            "#,
209            vault
210        )
211        .fetch_one(&state.pool)
212        .await
213        .unwrap_or(Some(0))
214        .unwrap_or(0)
215    } else {
216        sqlx::query_scalar!(
217            r#"
218            SELECT COUNT(*)::bigint
219            FROM runs
220            "#
221        )
222        .fetch_one(&state.pool)
223        .await
224        .unwrap_or(Some(0))
225        .unwrap_or(0)
226    };
227
228    let total_pages = (total_count + limit - 1) / limit;
229
230    // Fetch runs
231    let runs_result = if let Some(ref vault) = params.vault {
232        sqlx::query_as!(
233            models::Run,
234            r#"
235            SELECT id, name, timestamp, command, vault, project_root,
236                   exit_code, duration_ms, stdout, stderr,
237                   created_at, updated_at
238            FROM runs
239            WHERE vault = $1
240            ORDER BY timestamp DESC
241            LIMIT $2 OFFSET $3
242            "#,
243            vault,
244            limit,
245            offset
246        )
247        .fetch_all(&state.pool)
248        .await
249    } else {
250        sqlx::query_as!(
251            models::Run,
252            r#"
253            SELECT id, name, timestamp, command, vault, project_root,
254                   exit_code, duration_ms, stdout, stderr,
255                   created_at, updated_at
256            FROM runs
257            ORDER BY timestamp DESC
258            LIMIT $1 OFFSET $2
259            "#,
260            limit,
261            offset
262        )
263        .fetch_all(&state.pool)
264        .await
265    };
266
267    match runs_result {
268        Ok(runs) => RunsTemplate {
269            runs,
270            vault: params.vault,
271            page,
272            total_pages,
273        },
274        Err(e) => {
275            error!("Failed to fetch runs: {}", e);
276            RunsTemplate {
277                runs: Vec::new(),
278                vault: params.vault,
279                page: 1,
280                total_pages: 1,
281            }
282        }
283    }
284}
285
286async fn run_detail_page(
287    State(state): State<AppState>,
288    Path(id): Path<String>,
289) -> Result<RunDetailTemplate, StatusCode> {
290    info!("Rendering run detail page for: {}", id);
291
292    // Fetch run metadata
293    let run = sqlx::query_as!(
294        models::Run,
295        r#"
296        SELECT id, name, timestamp, command, vault, project_root,
297               exit_code, duration_ms, stdout, stderr,
298               created_at, updated_at
299        FROM runs
300        WHERE id = $1
301        "#,
302        id
303    )
304    .fetch_optional(&state.pool)
305    .await
306    .map_err(|e| {
307        error!("Database error while fetching run: {}", e);
308        StatusCode::INTERNAL_SERVER_ERROR
309    })?
310    .ok_or_else(|| {
311        info!("Run not found: {}", id);
312        StatusCode::NOT_FOUND
313    })?;
314
315    // Fetch hook outputs
316    let hook_outputs_result = sqlx::query_as!(
317        models::RunOutputRow,
318        r#"
319        SELECT phase, hook_id, config, output, success, error
320        FROM run_outputs
321        WHERE run_id = $1
322        ORDER BY id
323        "#,
324        id
325    )
326    .fetch_all(&state.pool)
327    .await;
328
329    let (pre_run_hooks, post_run_hooks) = match hook_outputs_result {
330        Ok(rows) => {
331            let mut pre_hooks = Vec::new();
332            let mut post_hooks = Vec::new();
333
334            for row in rows {
335                let hook_output = models::HookOutput {
336                    meta: models::HookMeta {
337                        id: row.hook_id,
338                        config: row.config,
339                        success: row.success,
340                        error: row.error,
341                    },
342                    output: row.output,
343                };
344
345                if row.phase == "pre" {
346                    pre_hooks.push(hook_output);
347                } else if row.phase == "post" {
348                    post_hooks.push(hook_output);
349                } else {
350                    warn!("Unknown hook phase: {}", row.phase);
351                }
352            }
353
354            (pre_hooks, post_hooks)
355        }
356        Err(e) => {
357            error!("Failed to fetch hook outputs: {}", e);
358            (Vec::new(), Vec::new())
359        }
360    };
361
362    // Fetch captured files
363    let files_result = sqlx::query_as!(
364        models::CapturedFile,
365        r#"
366        SELECT path, size, hash, storage_path, content_type
367        FROM captured_files
368        WHERE run_id = $1
369        ORDER BY path
370        "#,
371        id
372    )
373    .fetch_all(&state.pool)
374    .await;
375
376    let files = match files_result {
377        Ok(files) => files,
378        Err(e) => {
379            error!("Failed to fetch captured files: {}", e);
380            Vec::new()
381        }
382    };
383
384    Ok(RunDetailTemplate {
385        run,
386        pre_run_hooks,
387        post_run_hooks,
388        files,
389    })
390}
391
392async fn health_check(State(state): State<AppState>) -> impl IntoResponse {
393    match sqlx::query("SELECT 1").fetch_one(&state.pool).await {
394        Ok(_) => Json(json!({
395            "status": "ok",
396            "database": "connected"
397        })),
398        Err(e) => Json(json!({
399            "status": "error",
400            "database": "disconnected",
401            "error": e.to_string()
402        })),
403    }
404}
405
406async fn list_vaults(State(state): State<AppState>) -> impl IntoResponse {
407    info!("Listing all vaults");
408
409    let result = sqlx::query!(
410        r#"
411        SELECT vault as name, COUNT(*) as "run_count!"
412        FROM runs
413        GROUP BY vault
414        ORDER BY vault
415        "#
416    )
417    .fetch_all(&state.pool)
418    .await;
419
420    match result {
421        Ok(rows) => {
422            let vaults: Vec<VaultInfo> = rows
423                .into_iter()
424                .map(|row| VaultInfo {
425                    name: row.name,
426                    run_count: row.run_count,
427                })
428                .collect();
429            info!("Found {} vaults", vaults.len());
430            let response = capsula_api_types::VaultsResponse {
431                status: "ok".to_string(),
432                vaults,
433            };
434            Json(serde_json::to_value(response).expect("Failed to serialize VaultsResponse"))
435        }
436        Err(e) => {
437            error!("Failed to list vaults: {}", e);
438            let response = capsula_api_types::ErrorResponse {
439                status: "error".to_string(),
440                error: e.to_string(),
441            };
442            Json(serde_json::to_value(response).expect("Failed to serialize ErrorResponse"))
443        }
444    }
445}
446
447async fn get_vault_info(
448    State(state): State<AppState>,
449    Path(name): Path<String>,
450) -> impl IntoResponse {
451    info!("Getting vault info: {}", name);
452
453    let result = sqlx::query!(
454        r#"
455        SELECT vault as name, COUNT(*) as "run_count!"
456        FROM runs
457        WHERE vault = $1
458        GROUP BY vault
459        "#,
460        name
461    )
462    .fetch_optional(&state.pool)
463    .await;
464
465    match result {
466        Ok(Some(row)) => {
467            let vault = VaultInfo {
468                name: row.name,
469                run_count: row.run_count,
470            };
471            info!("Found vault: {} with {} runs", vault.name, vault.run_count);
472            let response = capsula_api_types::VaultExistsResponse {
473                status: "ok".to_string(),
474                exists: true,
475                vault: Some(vault),
476            };
477            Json(serde_json::to_value(response).expect("Failed to serialize VaultExistsResponse"))
478        }
479        Ok(None) => {
480            info!("Vault not found: {}", name);
481            let response = capsula_api_types::VaultExistsResponse {
482                status: "ok".to_string(),
483                exists: false,
484                vault: None,
485            };
486            Json(serde_json::to_value(response).expect("Failed to serialize VaultExistsResponse"))
487        }
488        Err(e) => {
489            error!("Failed to get vault info: {}", e);
490            let response = capsula_api_types::ErrorResponse {
491                status: "error".to_string(),
492                error: e.to_string(),
493            };
494            Json(serde_json::to_value(response).expect("Failed to serialize ErrorResponse"))
495        }
496    }
497}
498
499async fn list_runs(
500    State(state): State<AppState>,
501    Query(params): Query<models::ListRunsQuery>,
502) -> impl IntoResponse {
503    let limit = params.limit.unwrap_or(100);
504    let offset = params.offset.unwrap_or(0);
505
506    if let Some(ref vault) = params.vault {
507        info!(
508            "Listing runs for vault: {} (limit={}, offset={})",
509            vault, limit, offset
510        );
511    } else {
512        info!("Listing all runs (limit={}, offset={})", limit, offset);
513    }
514
515    let result = if let Some(vault) = params.vault {
516        sqlx::query_as!(
517            models::Run,
518            r#"
519            SELECT id, name, timestamp, command, vault, project_root,
520                   exit_code, duration_ms, stdout, stderr,
521                   created_at, updated_at
522            FROM runs
523            WHERE vault = $1
524            ORDER BY timestamp DESC
525            LIMIT $2 OFFSET $3
526            "#,
527            vault,
528            limit,
529            offset
530        )
531        .fetch_all(&state.pool)
532        .await
533    } else {
534        sqlx::query_as!(
535            models::Run,
536            r#"
537            SELECT id, name, timestamp, command, vault, project_root,
538                   exit_code, duration_ms, stdout, stderr,
539                   created_at, updated_at
540            FROM runs
541            ORDER BY timestamp DESC
542            LIMIT $1 OFFSET $2
543            "#,
544            limit,
545            offset
546        )
547        .fetch_all(&state.pool)
548        .await
549    };
550
551    match result {
552        Ok(runs) => {
553            info!("Found {} runs", runs.len());
554            Json(json!({
555                "status": "ok",
556                "runs": runs,
557                "limit": limit,
558                "offset": offset
559            }))
560        }
561        Err(e) => {
562            error!("Failed to list runs: {}", e);
563            Json(json!({
564                "status": "error",
565                "error": e.to_string()
566            }))
567        }
568    }
569}
570
571/// Search runs with `JSONPath` filters on hook outputs
572#[expect(clippy::too_many_lines, reason = "TODO: Refactor later")]
573async fn search_runs(
574    State(state): State<AppState>,
575    Json(request): Json<models::SearchRunsRequest>,
576) -> impl IntoResponse {
577    info!(
578        "Searching runs: vault={:?}, hook_filters={}",
579        request.vault,
580        request.hook_filters.len()
581    );
582
583    // Build the query
584    let builder = match query::RunQueryBuilder::from_request(&request) {
585        Ok(b) => b,
586        Err(e) => {
587            error!("Failed to build query: {}", e);
588            return Json(json!({
589                "status": "error",
590                "error": e.to_string()
591            }));
592        }
593    };
594
595    let query_sql = builder.build_query();
596    let count_sql = builder.build_count_query();
597    let bind_values = builder.bind_values();
598
599    info!("Executing search query: {}", query_sql);
600
601    // Execute count query first
602    let total: i64 = {
603        let mut query = sqlx::query_scalar::<_, i64>(&count_sql);
604        for value in bind_values {
605            query = match value {
606                query::BindValue::String(s) => query.bind(s.clone()),
607                query::BindValue::I32(i) => query.bind(*i),
608                query::BindValue::I64(i) => query.bind(*i),
609                query::BindValue::DateTime(dt) => query.bind(*dt),
610                query::BindValue::Bool(b) => query.bind(*b),
611            };
612        }
613        match query.fetch_one(&state.pool).await {
614            Ok(count) => count,
615            Err(e) => {
616                error!("Failed to execute count query: {}", e);
617                return Json(json!({
618                    "status": "error",
619                    "error": "Query execution failed"
620                }));
621            }
622        }
623    };
624
625    // Execute main query
626    let runs: Vec<models::Run> = {
627        let mut query = sqlx::query_as::<_, models::Run>(&query_sql);
628        for value in bind_values {
629            query = match value {
630                query::BindValue::String(s) => query.bind(s.clone()),
631                query::BindValue::I32(i) => query.bind(*i),
632                query::BindValue::I64(i) => query.bind(*i),
633                query::BindValue::DateTime(dt) => query.bind(*dt),
634                query::BindValue::Bool(b) => query.bind(*b),
635            };
636        }
637        match query.fetch_all(&state.pool).await {
638            Ok(runs) => runs,
639            Err(e) => {
640                error!("Failed to execute search query: {}", e);
641                return Json(json!({
642                    "status": "error",
643                    "error": "Query execution failed"
644                }));
645            }
646        }
647    };
648
649    info!("Found {} runs (total: {})", runs.len(), total);
650
651    // Determine what to include in response
652    let include_files = request.include.contains(&models::IncludeField::Files);
653    let include_stdout = request.include.contains(&models::IncludeField::Stdout);
654    let include_stderr = request.include.contains(&models::IncludeField::Stderr);
655    let include_hooks = request.include.contains(&models::IncludeField::Hooks);
656
657    // Build response
658    let mut results = Vec::with_capacity(runs.len());
659    for run in runs {
660        let files = if include_files {
661            match sqlx::query_as::<_, models::CapturedFile>(
662                "SELECT path, size, hash, storage_path, content_type \
663                 FROM captured_files WHERE run_id = $1 ORDER BY path",
664            )
665            .bind(&run.id)
666            .fetch_all(&state.pool)
667            .await
668            {
669                Ok(files) => Some(
670                    files
671                        .into_iter()
672                        .map(|f| models::FileInfo {
673                            path: f.path.clone(),
674                            size: f.size,
675                            hash: f.hash,
676                            url: format!("/api/v1/runs/{}/files/{}", run.id, f.path),
677                        })
678                        .collect(),
679                ),
680                Err(e) => {
681                    warn!("Failed to fetch files for run {}: {}", run.id, e);
682                    None
683                }
684            }
685        } else {
686            None
687        };
688
689        let (pre_run_hooks, post_run_hooks) = if include_hooks {
690            match sqlx::query_as::<_, models::RunOutputRow>(
691                "SELECT phase, hook_id, config, output, success, error \
692                 FROM run_outputs WHERE run_id = $1 ORDER BY id",
693            )
694            .bind(&run.id)
695            .fetch_all(&state.pool)
696            .await
697            {
698                Ok(rows) => {
699                    let mut pre_hooks = Vec::new();
700                    let mut post_hooks = Vec::new();
701                    for row in rows {
702                        let hook_output = models::HookOutput {
703                            meta: models::HookMeta {
704                                id: row.hook_id,
705                                config: row.config,
706                                success: row.success,
707                                error: row.error,
708                            },
709                            output: row.output,
710                        };
711                        if row.phase == "pre" {
712                            pre_hooks.push(hook_output);
713                        } else if row.phase == "post" {
714                            post_hooks.push(hook_output);
715                        } else {
716                            // Unknown phase, skip
717                        }
718                    }
719                    (Some(pre_hooks), Some(post_hooks))
720                }
721                Err(e) => {
722                    warn!("Failed to fetch hooks for run {}: {}", run.id, e);
723                    (None, None)
724                }
725            }
726        } else {
727            (None, None)
728        };
729
730        results.push(models::SearchRunResult {
731            id: run.id,
732            name: run.name,
733            timestamp: run.timestamp,
734            vault: run.vault,
735            command: run.command,
736            project_root: run.project_root,
737            exit_code: run.exit_code,
738            duration_ms: run.duration_ms,
739            stdout: if include_stdout { run.stdout } else { None },
740            stderr: if include_stderr { run.stderr } else { None },
741            files,
742            pre_run_hooks,
743            post_run_hooks,
744        });
745    }
746
747    let response = models::SearchRunsResponse {
748        status: "ok".to_string(),
749        total,
750        runs: results,
751    };
752
753    Json(serde_json::to_value(response).expect("Failed to serialize SearchRunsResponse"))
754}
755
756async fn create_run(
757    State(state): State<AppState>,
758    Json(request): Json<models::CreateRunRequest>,
759) -> impl IntoResponse {
760    info!("Creating run: {}", request.id);
761
762    let result = sqlx::query!(
763        r#"
764        INSERT INTO runs (
765            id, name, timestamp, command, vault, project_root,
766            exit_code, duration_ms, stdout, stderr
767        ) VALUES (
768            $1, $2, $3, $4, $5, $6,
769            $7, $8, $9, $10
770        )
771        "#,
772        request.id,
773        request.name,
774        request.timestamp,
775        request.command,
776        request.vault,
777        request.project_root,
778        request.exit_code,
779        request.duration_ms,
780        request.stdout,
781        request.stderr
782    )
783    .execute(&state.pool)
784    .await;
785
786    match result {
787        Ok(_) => {
788            info!("Run created successfully");
789            (
790                StatusCode::CREATED,
791                Json(json!({
792                    "status": "created",
793                    "run": request
794                })),
795            )
796        }
797        Err(e) => {
798            error!("Failed to insert run: {}", e);
799            let status = if e.to_string().contains("duplicate key") {
800                StatusCode::CONFLICT
801            } else {
802                StatusCode::INTERNAL_SERVER_ERROR
803            };
804            (
805                status,
806                Json(json!({
807                    "status": "error",
808                    "error": e.to_string()
809                })),
810            )
811        }
812    }
813}
814
815async fn get_run(State(state): State<AppState>, Path(id): Path<String>) -> impl IntoResponse {
816    info!("Getting run: {}", id);
817
818    let result = sqlx::query_as!(
819        models::Run,
820        r#"
821        SELECT id, name, timestamp, command, vault, project_root,
822               exit_code, duration_ms, stdout, stderr,
823               created_at, updated_at
824        FROM runs
825        WHERE id = $1
826        "#,
827        id
828    )
829    .fetch_optional(&state.pool)
830    .await;
831
832    match result {
833        Ok(Some(run)) => {
834            info!("Found run: {}", run.id);
835
836            // Fetch hook outputs
837            let hook_outputs_result = sqlx::query_as!(
838                models::RunOutputRow,
839                r#"
840                SELECT phase, hook_id, config, output, success, error
841                FROM run_outputs
842                WHERE run_id = $1
843                ORDER BY id
844                "#,
845                id
846            )
847            .fetch_all(&state.pool)
848            .await;
849
850            let (pre_run_hooks, post_run_hooks) = match hook_outputs_result {
851                Ok(rows) => {
852                    let mut pre_hooks = Vec::new();
853                    let mut post_hooks = Vec::new();
854
855                    for row in rows {
856                        let hook_output = models::HookOutput {
857                            meta: models::HookMeta {
858                                id: row.hook_id,
859                                config: row.config,
860                                success: row.success,
861                                error: row.error,
862                            },
863                            output: row.output,
864                        };
865
866                        if row.phase == "pre" {
867                            pre_hooks.push(hook_output);
868                        } else if row.phase == "post" {
869                            post_hooks.push(hook_output);
870                        } else {
871                            warn!("Unknown hook phase: {}", row.phase);
872                        }
873                    }
874
875                    info!(
876                        "Found {} pre-run hooks and {} post-run hooks",
877                        pre_hooks.len(),
878                        post_hooks.len()
879                    );
880                    (pre_hooks, post_hooks)
881                }
882                Err(e) => {
883                    error!("Failed to fetch hook outputs: {}", e);
884                    (Vec::new(), Vec::new())
885                }
886            };
887
888            Json(json!({
889                "status": "ok",
890                "run": run,
891                "pre_run_hooks": pre_run_hooks,
892                "post_run_hooks": post_run_hooks
893            }))
894        }
895        Ok(None) => {
896            info!("Run not found: {}", id);
897            Json(json!({
898                "status": "not_found",
899                "error": format!("Run with id {} not found", id)
900            }))
901        }
902        Err(e) => {
903            error!("Failed to retrieve run: {}", e);
904            Json(json!({
905                "status": "error",
906                "error": e.to_string()
907            }))
908        }
909    }
910}
911
912#[expect(clippy::too_many_lines, reason = "TODO: Refactor later")]
913#[expect(
914    clippy::else_if_without_else,
915    reason = "There is `continue` or `return` in each branch, so `else` is redundant"
916)]
917async fn upload_files(
918    State(state): State<AppState>,
919    mut multipart: Multipart,
920) -> impl IntoResponse {
921    let storage_path = &state.storage_path;
922    info!("Received file upload request");
923
924    if let Err(e) = tokio::fs::create_dir_all(&storage_path).await {
925        error!(
926            "Failed to create storage directory at {}: {}",
927            storage_path.display(),
928            e
929        );
930        return Json(json!({
931            "status": "error",
932            "error": format!("Failed to create storage directory: {}", e)
933        }));
934    }
935
936    let mut files_processed = 0;
937    let mut total_bytes = 0u64;
938    let mut run_id: Option<String> = None;
939    let mut pending_paths: VecDeque<String> = VecDeque::new();
940    let mut pre_run_hooks: Option<Vec<models::HookOutput>> = None;
941    let mut post_run_hooks: Option<Vec<models::HookOutput>> = None;
942
943    while let Ok(Some(field)) = multipart.next_field().await {
944        let field_name = field.name().unwrap_or("unknown").to_string();
945
946        if field_name == "run_id" {
947            match field.text().await {
948                Ok(text) => {
949                    run_id = Some(text);
950                    continue;
951                }
952                Err(e) => {
953                    error!("Failed to read run_id field: {}", e);
954                    return Json(json!({
955                        "status": "error",
956                        "error": format!("Failed to read run_id: {}", e)
957                    }));
958                }
959            }
960        } else if field_name == "path" {
961            match field.text().await {
962                Ok(text) => {
963                    pending_paths.push_back(text);
964                    continue;
965                }
966                Err(e) => {
967                    error!("Failed to read path field: {}", e);
968                    return Json(json!({
969                        "status": "error",
970                        "error": format!("Failed to read path: {}", e)
971                    }));
972                }
973            }
974        } else if field_name == "pre_run" {
975            match field.text().await {
976                Ok(text) => match serde_json::from_str::<Vec<models::HookOutput>>(&text) {
977                    Ok(hooks) => {
978                        info!("Parsed {} pre-run hooks", hooks.len());
979                        pre_run_hooks = Some(hooks);
980                        continue;
981                    }
982                    Err(e) => {
983                        error!("Failed to parse pre_run JSON: {}", e);
984                        return Json(json!({
985                            "status": "error",
986                            "error": format!("Failed to parse pre_run JSON: {}", e)
987                        }));
988                    }
989                },
990                Err(e) => {
991                    error!("Failed to read pre_run field: {}", e);
992                    return Json(json!({
993                        "status": "error",
994                        "error": format!("Failed to read pre_run: {}", e)
995                    }));
996                }
997            }
998        } else if field_name == "post_run" {
999            match field.text().await {
1000                Ok(text) => match serde_json::from_str::<Vec<models::HookOutput>>(&text) {
1001                    Ok(hooks) => {
1002                        info!("Parsed {} post-run hooks", hooks.len());
1003                        post_run_hooks = Some(hooks);
1004                        continue;
1005                    }
1006                    Err(e) => {
1007                        error!("Failed to parse post_run JSON: {}", e);
1008                        return Json(json!({
1009                            "status": "error",
1010                            "error": format!("Failed to parse post_run JSON: {}", e)
1011                        }));
1012                    }
1013                },
1014                Err(e) => {
1015                    error!("Failed to read post_run field: {}", e);
1016                    return Json(json!({
1017                        "status": "error",
1018                        "error": format!("Failed to read post_run: {}", e)
1019                    }));
1020                }
1021            }
1022        }
1023
1024        let file_name = field.file_name().unwrap_or("unknown").to_string();
1025        let content_type = field
1026            .content_type()
1027            .unwrap_or("application/octet-stream")
1028            .to_string();
1029
1030        info!(
1031            "Processing file: field_name={}, file_name={}, content_type={}",
1032            field_name, file_name, content_type
1033        );
1034
1035        match field.bytes().await {
1036            Ok(data) => {
1037                let size = data.len();
1038                total_bytes += size as u64;
1039                let Ok(size_i64) = i64::try_from(size) else {
1040                    error!("File too large to store size in database: {} bytes", size);
1041                    return Json(json!({
1042                        "status": "error",
1043                        "error": "File too large to store size in database"
1044                    }));
1045                };
1046
1047                let mut hasher = Sha256::new();
1048                hasher.update(&data);
1049                let hash = format!("{:x}", hasher.finalize());
1050
1051                info!("File hash: {}, size: {} bytes", hash, size);
1052
1053                let hash_dir = &hash[0..2];
1054                let file_storage_dir = storage_path.join(hash_dir);
1055                if let Err(e) = tokio::fs::create_dir_all(&file_storage_dir).await {
1056                    error!(
1057                        "Failed to create hash directory at {}: {}",
1058                        file_storage_dir.display(),
1059                        e
1060                    );
1061                    return Json(json!({
1062                        "status": "error",
1063                        "error": format!("Failed to create storage directory: {}", e)
1064                    }));
1065                }
1066
1067                let file_storage_path = file_storage_dir.join(&hash);
1068                let storage_path_str = file_storage_path.to_string_lossy().to_string();
1069
1070                if file_storage_path.exists() {
1071                    info!("File already exists (deduplicated): {}", storage_path_str);
1072                } else {
1073                    if let Err(e) = tokio::fs::write(&file_storage_path, &data).await {
1074                        error!("Failed to write file: {}", e);
1075                        return Json(json!({
1076                            "status": "error",
1077                            "error": format!("Failed to write file: {}", e)
1078                        }));
1079                    }
1080                    info!("Saved new file: {}", storage_path_str);
1081                }
1082
1083                if let Some(ref rid) = run_id {
1084                    let relative_path = pending_paths
1085                        .pop_front()
1086                        .or_else(|| {
1087                            if file_name == "unknown" {
1088                                None
1089                            } else {
1090                                Some(file_name.clone())
1091                            }
1092                        })
1093                        .unwrap_or_else(|| {
1094                            if field_name != "unknown" && field_name != "file" {
1095                                field_name.clone()
1096                            } else {
1097                                format!("file-{}", files_processed + 1)
1098                            }
1099                        });
1100
1101                    let result = sqlx::query!(
1102                        r#"
1103                        INSERT INTO captured_files (run_id, path, size, hash, storage_path, content_type)
1104                        VALUES ($1, $2, $3, $4, $5, $6)
1105                        ON CONFLICT (run_id, path) DO UPDATE
1106                        SET size = EXCLUDED.size,
1107                            hash = EXCLUDED.hash,
1108                            storage_path = EXCLUDED.storage_path,
1109                            content_type = EXCLUDED.content_type
1110                        "#,
1111                        rid,
1112                        relative_path,
1113                        size_i64,
1114                        hash,
1115                        storage_path_str,
1116                        content_type
1117                    )
1118                    .execute(&state.pool)
1119                    .await;
1120
1121                    match result {
1122                        Ok(_) => {
1123                            info!("Stored file metadata in database");
1124                        }
1125                        Err(e) => {
1126                            error!("Failed to store file metadata: {}", e);
1127                            return Json(json!({
1128                                "status": "error",
1129                                "error": format!("Failed to store file metadata: {}", e)
1130                            }));
1131                        }
1132                    }
1133                }
1134
1135                files_processed += 1;
1136                info!("Successfully processed file: {} bytes", size);
1137            }
1138            Err(e) => {
1139                error!("Failed to read file field: {}", e);
1140                return Json(json!({
1141                    "status": "error",
1142                    "error": format!("Failed to read file: {}", e)
1143                }));
1144            }
1145        }
1146    }
1147
1148    info!(
1149        "Upload complete: {} files, {} bytes total",
1150        files_processed, total_bytes
1151    );
1152
1153    // Store hook outputs if provided
1154    let mut pre_run_count = 0;
1155    let mut post_run_count = 0;
1156
1157    if let Some(ref rid) = run_id {
1158        if let Some(hooks) = pre_run_hooks {
1159            for hook in hooks {
1160                let result = sqlx::query!(
1161                    r#"
1162                    INSERT INTO run_outputs (run_id, phase, hook_id, config, output, success, error)
1163                    VALUES ($1, 'pre', $2, $3, $4, $5, $6)
1164                    "#,
1165                    rid,
1166                    hook.meta.id,
1167                    hook.meta.config,
1168                    hook.output,
1169                    hook.meta.success,
1170                    hook.meta.error
1171                )
1172                .execute(&state.pool)
1173                .await;
1174
1175                match result {
1176                    Ok(_) => {
1177                        pre_run_count += 1;
1178                        info!("Stored pre-run hook: {}", hook.meta.id);
1179                    }
1180                    Err(e) => {
1181                        error!("Failed to store pre-run hook {}: {}", hook.meta.id, e);
1182                        return Json(json!({
1183                            "status": "error",
1184                            "error": format!("Failed to store pre-run hook {}: {}", hook.meta.id, e)
1185                        }));
1186                    }
1187                }
1188            }
1189        }
1190
1191        if let Some(hooks) = post_run_hooks {
1192            for hook in hooks {
1193                let result = sqlx::query!(
1194                    r#"
1195                    INSERT INTO run_outputs (run_id, phase, hook_id, config, output, success, error)
1196                    VALUES ($1, 'post', $2, $3, $4, $5, $6)
1197                    "#,
1198                    rid,
1199                    hook.meta.id,
1200                    hook.meta.config,
1201                    hook.output,
1202                    hook.meta.success,
1203                    hook.meta.error
1204                )
1205                .execute(&state.pool)
1206                .await;
1207
1208                match result {
1209                    Ok(_) => {
1210                        post_run_count += 1;
1211                        info!("Stored post-run hook: {}", hook.meta.id);
1212                    }
1213                    Err(e) => {
1214                        error!("Failed to store post-run hook {}: {}", hook.meta.id, e);
1215                        return Json(json!({
1216                            "status": "error",
1217                            "error": format!("Failed to store post-run hook {}: {}", hook.meta.id, e)
1218                        }));
1219                    }
1220                }
1221            }
1222        }
1223    }
1224
1225    info!(
1226        "Hook outputs stored: {} pre-run, {} post-run",
1227        pre_run_count, post_run_count
1228    );
1229
1230    let response = capsula_api_types::UploadResponse {
1231        status: "ok".to_string(),
1232        files_processed,
1233        total_bytes,
1234        pre_run_hooks: pre_run_count,
1235        post_run_hooks: post_run_count,
1236    };
1237    Json(serde_json::to_value(response).expect("Failed to serialize UploadResponse"))
1238}
1239
1240async fn download_file(
1241    State(state): State<AppState>,
1242    Path((run_id, file_path)): Path<(String, String)>,
1243) -> Result<Response, StatusCode> {
1244    info!("Downloading file: run_id={}, path={}", run_id, file_path);
1245
1246    // Query the database for file metadata
1247    let file_record = sqlx::query!(
1248        r#"
1249        SELECT storage_path, content_type, path as file_path
1250        FROM captured_files
1251        WHERE run_id = $1 AND path = $2
1252        "#,
1253        run_id,
1254        file_path
1255    )
1256    .fetch_optional(&state.pool)
1257    .await
1258    .map_err(|e| {
1259        error!("Database error while fetching file metadata: {}", e);
1260        StatusCode::INTERNAL_SERVER_ERROR
1261    })?;
1262
1263    let Some(record) = file_record else {
1264        info!("File not found: run_id={}, path={}", run_id, file_path);
1265        return Err(StatusCode::NOT_FOUND);
1266    };
1267
1268    // Read the file from storage
1269    let file_data = tokio::fs::read(&record.storage_path).await.map_err(|e| {
1270        error!(
1271            "Failed to read file from storage {}: {}",
1272            record.storage_path, e
1273        );
1274        StatusCode::INTERNAL_SERVER_ERROR
1275    })?;
1276
1277    info!(
1278        "Successfully read file: {} bytes from {}",
1279        file_data.len(),
1280        record.storage_path
1281    );
1282
1283    // Determine content type (use stored value or guess from filename)
1284    let content_type = record.content_type.unwrap_or_else(|| {
1285        mime_guess::from_path(&record.file_path)
1286            .first_or_octet_stream()
1287            .to_string()
1288    });
1289
1290    // Extract filename from path for Content-Disposition
1291    let filename = std::path::Path::new(&record.file_path)
1292        .file_name()
1293        .and_then(|n| n.to_str())
1294        .unwrap_or("download");
1295
1296    // Build response with appropriate headers
1297    Response::builder()
1298        .status(StatusCode::OK)
1299        .header(header::CONTENT_TYPE, content_type)
1300        .header(
1301            header::CONTENT_DISPOSITION,
1302            format!("inline; filename=\"{filename}\""),
1303        )
1304        .body(Body::from(file_data))
1305        .map_err(|e| {
1306            error!("Failed to build response: {}", e);
1307            StatusCode::INTERNAL_SERVER_ERROR
1308        })
1309}