1use askama::Template;
2use askama_web::WebTemplate;
3
4#[expect(
5 clippy::unnecessary_wraps,
6 reason = "Askama filter_fn macro generates code that triggers this lint, but Result return type is required by the Askama filter API"
7)]
8#[expect(
9 clippy::inline_always,
10 clippy::unused_self,
11 reason = "Askama's #[filter_fn] macro generates builder pattern code with #[inline(always)] that triggers these lints"
12)]
13mod filters {
14 use askama::filter_fn;
15
16 #[filter_fn]
17 pub fn format_command(s: &str, _env: &dyn askama::Values) -> ::askama::Result<String> {
18 Ok(serde_json::from_str::<Vec<String>>(s).map_or_else(
20 |_| s.to_string(),
21 |cmd_array| {
22 shlex::try_join(cmd_array.iter().map(String::as_str))
23 .unwrap_or_else(|_| cmd_array.join(" "))
24 },
25 ))
26 }
27
28 #[filter_fn]
29 pub fn pretty_json<T: std::fmt::Display>(
30 s: T,
31 _env: &dyn askama::Values,
32 ) -> ::askama::Result<String> {
33 let s_str = s.to_string();
34 Ok(serde_json::from_str::<serde_json::Value>(&s_str)
36 .ok()
37 .and_then(|value| serde_json::to_string_pretty(&value).ok())
38 .unwrap_or(s_str))
39 }
40}
41
42use axum::{
43 Router,
44 body::Body,
45 extract::{DefaultBodyLimit, Multipart, Path, Query, State},
46 http::{StatusCode, header},
47 response::{IntoResponse, Json, Response},
48 routing::{get, post},
49};
50use capsula_api_types::VaultInfo;
51use serde_json::json;
52use sha2::{Digest, Sha256};
53use sqlx::PgPool;
54use std::collections::VecDeque;
55use std::path::PathBuf;
56use tower_http::services::ServeDir;
57use tracing::{error, info, warn};
58
59#[derive(Clone)]
60pub struct AppState {
61 pub pool: PgPool,
62 pub storage_path: PathBuf,
63}
64
65mod models;
66mod query;
67
68#[derive(Template, WebTemplate)]
69#[template(path = "index.html")]
70struct IndexTemplate;
71
72#[derive(Template, WebTemplate)]
73#[template(path = "vaults.html")]
74struct VaultsTemplate {
75 vaults: Vec<VaultInfo>,
76}
77
78#[derive(Template, WebTemplate)]
79#[template(path = "runs.html")]
80struct RunsTemplate {
81 runs: Vec<models::Run>,
82 vault: Option<String>,
83 page: i64,
84 total_pages: i64,
85}
86
87#[derive(Template, WebTemplate)]
88#[template(path = "run_detail.html")]
89struct RunDetailTemplate {
90 run: models::Run,
91 pre_run_hooks: Vec<models::HookOutput>,
92 post_run_hooks: Vec<models::HookOutput>,
93 files: Vec<models::CapturedFile>,
94}
95
96#[derive(Template, WebTemplate)]
97#[template(path = "error.html")]
98struct ErrorTemplate {
99 status_code: u16,
100 title: String,
101 message: String,
102}
103
104pub async fn create_pool(database_url: &str, max_connections: u32) -> Result<PgPool, sqlx::Error> {
105 sqlx::postgres::PgPoolOptions::new()
106 .max_connections(max_connections)
107 .acquire_timeout(std::time::Duration::from_secs(3))
108 .connect(database_url)
109 .await
110}
111
112pub fn build_app(pool: PgPool, storage_path: PathBuf, max_body_size: usize) -> Router {
113 let static_dir: PathBuf = std::env::var("CAPSULA_STATIC_DIR")
114 .expect("CAPSULA_STATIC_DIR environment variable must be set")
115 .into();
116
117 let state = AppState { pool, storage_path };
118
119 Router::new()
120 .route("/", get(index))
121 .route("/vaults", get(vaults_page))
122 .route("/runs", get(runs_page))
123 .route("/runs/{id}", get(run_detail_page))
124 .route("/health", get(health_check))
125 .route("/api/v1/vaults", get(list_vaults))
126 .route("/api/v1/vaults/{name}", get(get_vault_info))
127 .route("/api/v1/runs", post(create_run).get(list_runs))
128 .route("/api/v1/runs/search", post(search_runs))
129 .route("/api/v1/runs/{id}", get(get_run))
130 .route("/api/v1/runs/{id}/files/{*path}", get(download_file))
131 .route(
132 "/api/v1/upload",
133 post(upload_files).layer(DefaultBodyLimit::max(max_body_size)),
134 )
135 .nest_service("/static", ServeDir::new(static_dir))
136 .fallback(not_found)
137 .with_state(state)
138}
139
140async fn index() -> impl IntoResponse {
141 IndexTemplate
142}
143
144async fn not_found() -> impl IntoResponse {
145 (
146 StatusCode::NOT_FOUND,
147 ErrorTemplate {
148 status_code: 404,
149 title: "Page Not Found".to_string(),
150 message: "The page you are looking for does not exist.".to_string(),
151 },
152 )
153}
154
155async fn vaults_page(State(state): State<AppState>) -> impl IntoResponse {
156 info!("Rendering vaults page");
157
158 let result = sqlx::query!(
159 r#"
160 SELECT vault as name, COUNT(*) as "run_count!"
161 FROM runs
162 GROUP BY vault
163 ORDER BY vault
164 "#
165 )
166 .fetch_all(&state.pool)
167 .await;
168
169 match result {
170 Ok(rows) => {
171 let vaults: Vec<VaultInfo> = rows
172 .into_iter()
173 .map(|row| VaultInfo {
174 name: row.name,
175 run_count: row.run_count,
176 })
177 .collect();
178 VaultsTemplate { vaults }
179 }
180 Err(e) => {
181 error!("Failed to fetch vaults: {}", e);
182 VaultsTemplate { vaults: Vec::new() }
184 }
185 }
186}
187
188async fn runs_page(
189 State(state): State<AppState>,
190 Query(params): Query<models::ListRunsQuery>,
191) -> impl IntoResponse {
192 let page = params.offset.unwrap_or(0) / params.limit.unwrap_or(50) + 1;
193 let limit = params.limit.unwrap_or(50);
194 let offset = (page - 1) * limit;
195
196 info!(
197 "Rendering runs page: page={}, vault={:?}",
198 page, params.vault
199 );
200
201 let total_count = if let Some(ref vault) = params.vault {
203 sqlx::query_scalar!(
204 r#"
205 SELECT COUNT(*)::bigint
206 FROM runs
207 WHERE vault = $1
208 "#,
209 vault
210 )
211 .fetch_one(&state.pool)
212 .await
213 .unwrap_or(Some(0))
214 .unwrap_or(0)
215 } else {
216 sqlx::query_scalar!(
217 r#"
218 SELECT COUNT(*)::bigint
219 FROM runs
220 "#
221 )
222 .fetch_one(&state.pool)
223 .await
224 .unwrap_or(Some(0))
225 .unwrap_or(0)
226 };
227
228 let total_pages = (total_count + limit - 1) / limit;
229
230 let runs_result = if let Some(ref vault) = params.vault {
232 sqlx::query_as!(
233 models::Run,
234 r#"
235 SELECT id, name, timestamp, command, vault, project_root,
236 exit_code, duration_ms, stdout, stderr,
237 created_at, updated_at
238 FROM runs
239 WHERE vault = $1
240 ORDER BY timestamp DESC
241 LIMIT $2 OFFSET $3
242 "#,
243 vault,
244 limit,
245 offset
246 )
247 .fetch_all(&state.pool)
248 .await
249 } else {
250 sqlx::query_as!(
251 models::Run,
252 r#"
253 SELECT id, name, timestamp, command, vault, project_root,
254 exit_code, duration_ms, stdout, stderr,
255 created_at, updated_at
256 FROM runs
257 ORDER BY timestamp DESC
258 LIMIT $1 OFFSET $2
259 "#,
260 limit,
261 offset
262 )
263 .fetch_all(&state.pool)
264 .await
265 };
266
267 match runs_result {
268 Ok(runs) => RunsTemplate {
269 runs,
270 vault: params.vault,
271 page,
272 total_pages,
273 },
274 Err(e) => {
275 error!("Failed to fetch runs: {}", e);
276 RunsTemplate {
277 runs: Vec::new(),
278 vault: params.vault,
279 page: 1,
280 total_pages: 1,
281 }
282 }
283 }
284}
285
286async fn run_detail_page(
287 State(state): State<AppState>,
288 Path(id): Path<String>,
289) -> Result<RunDetailTemplate, StatusCode> {
290 info!("Rendering run detail page for: {}", id);
291
292 let run = sqlx::query_as!(
294 models::Run,
295 r#"
296 SELECT id, name, timestamp, command, vault, project_root,
297 exit_code, duration_ms, stdout, stderr,
298 created_at, updated_at
299 FROM runs
300 WHERE id = $1
301 "#,
302 id
303 )
304 .fetch_optional(&state.pool)
305 .await
306 .map_err(|e| {
307 error!("Database error while fetching run: {}", e);
308 StatusCode::INTERNAL_SERVER_ERROR
309 })?
310 .ok_or_else(|| {
311 info!("Run not found: {}", id);
312 StatusCode::NOT_FOUND
313 })?;
314
315 let hook_outputs_result = sqlx::query_as!(
317 models::RunOutputRow,
318 r#"
319 SELECT phase, hook_id, config, output, success, error
320 FROM run_outputs
321 WHERE run_id = $1
322 ORDER BY id
323 "#,
324 id
325 )
326 .fetch_all(&state.pool)
327 .await;
328
329 let (pre_run_hooks, post_run_hooks) = match hook_outputs_result {
330 Ok(rows) => {
331 let mut pre_hooks = Vec::new();
332 let mut post_hooks = Vec::new();
333
334 for row in rows {
335 let hook_output = models::HookOutput {
336 meta: models::HookMeta {
337 id: row.hook_id,
338 config: row.config,
339 success: row.success,
340 error: row.error,
341 },
342 output: row.output,
343 };
344
345 if row.phase == "pre" {
346 pre_hooks.push(hook_output);
347 } else if row.phase == "post" {
348 post_hooks.push(hook_output);
349 } else {
350 warn!("Unknown hook phase: {}", row.phase);
351 }
352 }
353
354 (pre_hooks, post_hooks)
355 }
356 Err(e) => {
357 error!("Failed to fetch hook outputs: {}", e);
358 (Vec::new(), Vec::new())
359 }
360 };
361
362 let files_result = sqlx::query_as!(
364 models::CapturedFile,
365 r#"
366 SELECT path, size, hash, storage_path, content_type
367 FROM captured_files
368 WHERE run_id = $1
369 ORDER BY path
370 "#,
371 id
372 )
373 .fetch_all(&state.pool)
374 .await;
375
376 let files = match files_result {
377 Ok(files) => files,
378 Err(e) => {
379 error!("Failed to fetch captured files: {}", e);
380 Vec::new()
381 }
382 };
383
384 Ok(RunDetailTemplate {
385 run,
386 pre_run_hooks,
387 post_run_hooks,
388 files,
389 })
390}
391
392async fn health_check(State(state): State<AppState>) -> impl IntoResponse {
393 match sqlx::query("SELECT 1").fetch_one(&state.pool).await {
394 Ok(_) => Json(json!({
395 "status": "ok",
396 "database": "connected"
397 })),
398 Err(e) => Json(json!({
399 "status": "error",
400 "database": "disconnected",
401 "error": e.to_string()
402 })),
403 }
404}
405
406async fn list_vaults(State(state): State<AppState>) -> impl IntoResponse {
407 info!("Listing all vaults");
408
409 let result = sqlx::query!(
410 r#"
411 SELECT vault as name, COUNT(*) as "run_count!"
412 FROM runs
413 GROUP BY vault
414 ORDER BY vault
415 "#
416 )
417 .fetch_all(&state.pool)
418 .await;
419
420 match result {
421 Ok(rows) => {
422 let vaults: Vec<VaultInfo> = rows
423 .into_iter()
424 .map(|row| VaultInfo {
425 name: row.name,
426 run_count: row.run_count,
427 })
428 .collect();
429 info!("Found {} vaults", vaults.len());
430 let response = capsula_api_types::VaultsResponse {
431 status: "ok".to_string(),
432 vaults,
433 };
434 Json(serde_json::to_value(response).expect("Failed to serialize VaultsResponse"))
435 }
436 Err(e) => {
437 error!("Failed to list vaults: {}", e);
438 let response = capsula_api_types::ErrorResponse {
439 status: "error".to_string(),
440 error: e.to_string(),
441 };
442 Json(serde_json::to_value(response).expect("Failed to serialize ErrorResponse"))
443 }
444 }
445}
446
447async fn get_vault_info(
448 State(state): State<AppState>,
449 Path(name): Path<String>,
450) -> impl IntoResponse {
451 info!("Getting vault info: {}", name);
452
453 let result = sqlx::query!(
454 r#"
455 SELECT vault as name, COUNT(*) as "run_count!"
456 FROM runs
457 WHERE vault = $1
458 GROUP BY vault
459 "#,
460 name
461 )
462 .fetch_optional(&state.pool)
463 .await;
464
465 match result {
466 Ok(Some(row)) => {
467 let vault = VaultInfo {
468 name: row.name,
469 run_count: row.run_count,
470 };
471 info!("Found vault: {} with {} runs", vault.name, vault.run_count);
472 let response = capsula_api_types::VaultExistsResponse {
473 status: "ok".to_string(),
474 exists: true,
475 vault: Some(vault),
476 };
477 Json(serde_json::to_value(response).expect("Failed to serialize VaultExistsResponse"))
478 }
479 Ok(None) => {
480 info!("Vault not found: {}", name);
481 let response = capsula_api_types::VaultExistsResponse {
482 status: "ok".to_string(),
483 exists: false,
484 vault: None,
485 };
486 Json(serde_json::to_value(response).expect("Failed to serialize VaultExistsResponse"))
487 }
488 Err(e) => {
489 error!("Failed to get vault info: {}", e);
490 let response = capsula_api_types::ErrorResponse {
491 status: "error".to_string(),
492 error: e.to_string(),
493 };
494 Json(serde_json::to_value(response).expect("Failed to serialize ErrorResponse"))
495 }
496 }
497}
498
499async fn list_runs(
500 State(state): State<AppState>,
501 Query(params): Query<models::ListRunsQuery>,
502) -> impl IntoResponse {
503 let limit = params.limit.unwrap_or(100);
504 let offset = params.offset.unwrap_or(0);
505
506 if let Some(ref vault) = params.vault {
507 info!(
508 "Listing runs for vault: {} (limit={}, offset={})",
509 vault, limit, offset
510 );
511 } else {
512 info!("Listing all runs (limit={}, offset={})", limit, offset);
513 }
514
515 let result = if let Some(vault) = params.vault {
516 sqlx::query_as!(
517 models::Run,
518 r#"
519 SELECT id, name, timestamp, command, vault, project_root,
520 exit_code, duration_ms, stdout, stderr,
521 created_at, updated_at
522 FROM runs
523 WHERE vault = $1
524 ORDER BY timestamp DESC
525 LIMIT $2 OFFSET $3
526 "#,
527 vault,
528 limit,
529 offset
530 )
531 .fetch_all(&state.pool)
532 .await
533 } else {
534 sqlx::query_as!(
535 models::Run,
536 r#"
537 SELECT id, name, timestamp, command, vault, project_root,
538 exit_code, duration_ms, stdout, stderr,
539 created_at, updated_at
540 FROM runs
541 ORDER BY timestamp DESC
542 LIMIT $1 OFFSET $2
543 "#,
544 limit,
545 offset
546 )
547 .fetch_all(&state.pool)
548 .await
549 };
550
551 match result {
552 Ok(runs) => {
553 info!("Found {} runs", runs.len());
554 Json(json!({
555 "status": "ok",
556 "runs": runs,
557 "limit": limit,
558 "offset": offset
559 }))
560 }
561 Err(e) => {
562 error!("Failed to list runs: {}", e);
563 Json(json!({
564 "status": "error",
565 "error": e.to_string()
566 }))
567 }
568 }
569}
570
571#[expect(clippy::too_many_lines, reason = "TODO: Refactor later")]
573async fn search_runs(
574 State(state): State<AppState>,
575 Json(request): Json<models::SearchRunsRequest>,
576) -> impl IntoResponse {
577 info!(
578 "Searching runs: vault={:?}, hook_filters={}",
579 request.vault,
580 request.hook_filters.len()
581 );
582
583 let builder = match query::RunQueryBuilder::from_request(&request) {
585 Ok(b) => b,
586 Err(e) => {
587 error!("Failed to build query: {}", e);
588 return Json(json!({
589 "status": "error",
590 "error": e.to_string()
591 }));
592 }
593 };
594
595 let query_sql = builder.build_query();
596 let count_sql = builder.build_count_query();
597 let bind_values = builder.bind_values();
598
599 info!("Executing search query: {}", query_sql);
600
601 let total: i64 = {
603 let mut query = sqlx::query_scalar::<_, i64>(&count_sql);
604 for value in bind_values {
605 query = match value {
606 query::BindValue::String(s) => query.bind(s.clone()),
607 query::BindValue::I32(i) => query.bind(*i),
608 query::BindValue::I64(i) => query.bind(*i),
609 query::BindValue::DateTime(dt) => query.bind(*dt),
610 query::BindValue::Bool(b) => query.bind(*b),
611 };
612 }
613 match query.fetch_one(&state.pool).await {
614 Ok(count) => count,
615 Err(e) => {
616 error!("Failed to execute count query: {}", e);
617 return Json(json!({
618 "status": "error",
619 "error": "Query execution failed"
620 }));
621 }
622 }
623 };
624
625 let runs: Vec<models::Run> = {
627 let mut query = sqlx::query_as::<_, models::Run>(&query_sql);
628 for value in bind_values {
629 query = match value {
630 query::BindValue::String(s) => query.bind(s.clone()),
631 query::BindValue::I32(i) => query.bind(*i),
632 query::BindValue::I64(i) => query.bind(*i),
633 query::BindValue::DateTime(dt) => query.bind(*dt),
634 query::BindValue::Bool(b) => query.bind(*b),
635 };
636 }
637 match query.fetch_all(&state.pool).await {
638 Ok(runs) => runs,
639 Err(e) => {
640 error!("Failed to execute search query: {}", e);
641 return Json(json!({
642 "status": "error",
643 "error": "Query execution failed"
644 }));
645 }
646 }
647 };
648
649 info!("Found {} runs (total: {})", runs.len(), total);
650
651 let include_files = request.include.contains(&models::IncludeField::Files);
653 let include_stdout = request.include.contains(&models::IncludeField::Stdout);
654 let include_stderr = request.include.contains(&models::IncludeField::Stderr);
655 let include_hooks = request.include.contains(&models::IncludeField::Hooks);
656
657 let mut results = Vec::with_capacity(runs.len());
659 for run in runs {
660 let files = if include_files {
661 match sqlx::query_as::<_, models::CapturedFile>(
662 "SELECT path, size, hash, storage_path, content_type \
663 FROM captured_files WHERE run_id = $1 ORDER BY path",
664 )
665 .bind(&run.id)
666 .fetch_all(&state.pool)
667 .await
668 {
669 Ok(files) => Some(
670 files
671 .into_iter()
672 .map(|f| models::FileInfo {
673 path: f.path.clone(),
674 size: f.size,
675 hash: f.hash,
676 url: format!("/api/v1/runs/{}/files/{}", run.id, f.path),
677 })
678 .collect(),
679 ),
680 Err(e) => {
681 warn!("Failed to fetch files for run {}: {}", run.id, e);
682 None
683 }
684 }
685 } else {
686 None
687 };
688
689 let (pre_run_hooks, post_run_hooks) = if include_hooks {
690 match sqlx::query_as::<_, models::RunOutputRow>(
691 "SELECT phase, hook_id, config, output, success, error \
692 FROM run_outputs WHERE run_id = $1 ORDER BY id",
693 )
694 .bind(&run.id)
695 .fetch_all(&state.pool)
696 .await
697 {
698 Ok(rows) => {
699 let mut pre_hooks = Vec::new();
700 let mut post_hooks = Vec::new();
701 for row in rows {
702 let hook_output = models::HookOutput {
703 meta: models::HookMeta {
704 id: row.hook_id,
705 config: row.config,
706 success: row.success,
707 error: row.error,
708 },
709 output: row.output,
710 };
711 if row.phase == "pre" {
712 pre_hooks.push(hook_output);
713 } else if row.phase == "post" {
714 post_hooks.push(hook_output);
715 } else {
716 }
718 }
719 (Some(pre_hooks), Some(post_hooks))
720 }
721 Err(e) => {
722 warn!("Failed to fetch hooks for run {}: {}", run.id, e);
723 (None, None)
724 }
725 }
726 } else {
727 (None, None)
728 };
729
730 results.push(models::SearchRunResult {
731 id: run.id,
732 name: run.name,
733 timestamp: run.timestamp,
734 vault: run.vault,
735 command: run.command,
736 project_root: run.project_root,
737 exit_code: run.exit_code,
738 duration_ms: run.duration_ms,
739 stdout: if include_stdout { run.stdout } else { None },
740 stderr: if include_stderr { run.stderr } else { None },
741 files,
742 pre_run_hooks,
743 post_run_hooks,
744 });
745 }
746
747 let response = models::SearchRunsResponse {
748 status: "ok".to_string(),
749 total,
750 runs: results,
751 };
752
753 Json(serde_json::to_value(response).expect("Failed to serialize SearchRunsResponse"))
754}
755
756async fn create_run(
757 State(state): State<AppState>,
758 Json(request): Json<models::CreateRunRequest>,
759) -> impl IntoResponse {
760 info!("Creating run: {}", request.id);
761
762 let result = sqlx::query!(
763 r#"
764 INSERT INTO runs (
765 id, name, timestamp, command, vault, project_root,
766 exit_code, duration_ms, stdout, stderr
767 ) VALUES (
768 $1, $2, $3, $4, $5, $6,
769 $7, $8, $9, $10
770 )
771 "#,
772 request.id,
773 request.name,
774 request.timestamp,
775 request.command,
776 request.vault,
777 request.project_root,
778 request.exit_code,
779 request.duration_ms,
780 request.stdout,
781 request.stderr
782 )
783 .execute(&state.pool)
784 .await;
785
786 match result {
787 Ok(_) => {
788 info!("Run created successfully");
789 (
790 StatusCode::CREATED,
791 Json(json!({
792 "status": "created",
793 "run": request
794 })),
795 )
796 }
797 Err(e) => {
798 error!("Failed to insert run: {}", e);
799 let status = if e.to_string().contains("duplicate key") {
800 StatusCode::CONFLICT
801 } else {
802 StatusCode::INTERNAL_SERVER_ERROR
803 };
804 (
805 status,
806 Json(json!({
807 "status": "error",
808 "error": e.to_string()
809 })),
810 )
811 }
812 }
813}
814
815async fn get_run(State(state): State<AppState>, Path(id): Path<String>) -> impl IntoResponse {
816 info!("Getting run: {}", id);
817
818 let result = sqlx::query_as!(
819 models::Run,
820 r#"
821 SELECT id, name, timestamp, command, vault, project_root,
822 exit_code, duration_ms, stdout, stderr,
823 created_at, updated_at
824 FROM runs
825 WHERE id = $1
826 "#,
827 id
828 )
829 .fetch_optional(&state.pool)
830 .await;
831
832 match result {
833 Ok(Some(run)) => {
834 info!("Found run: {}", run.id);
835
836 let hook_outputs_result = sqlx::query_as!(
838 models::RunOutputRow,
839 r#"
840 SELECT phase, hook_id, config, output, success, error
841 FROM run_outputs
842 WHERE run_id = $1
843 ORDER BY id
844 "#,
845 id
846 )
847 .fetch_all(&state.pool)
848 .await;
849
850 let (pre_run_hooks, post_run_hooks) = match hook_outputs_result {
851 Ok(rows) => {
852 let mut pre_hooks = Vec::new();
853 let mut post_hooks = Vec::new();
854
855 for row in rows {
856 let hook_output = models::HookOutput {
857 meta: models::HookMeta {
858 id: row.hook_id,
859 config: row.config,
860 success: row.success,
861 error: row.error,
862 },
863 output: row.output,
864 };
865
866 if row.phase == "pre" {
867 pre_hooks.push(hook_output);
868 } else if row.phase == "post" {
869 post_hooks.push(hook_output);
870 } else {
871 warn!("Unknown hook phase: {}", row.phase);
872 }
873 }
874
875 info!(
876 "Found {} pre-run hooks and {} post-run hooks",
877 pre_hooks.len(),
878 post_hooks.len()
879 );
880 (pre_hooks, post_hooks)
881 }
882 Err(e) => {
883 error!("Failed to fetch hook outputs: {}", e);
884 (Vec::new(), Vec::new())
885 }
886 };
887
888 Json(json!({
889 "status": "ok",
890 "run": run,
891 "pre_run_hooks": pre_run_hooks,
892 "post_run_hooks": post_run_hooks
893 }))
894 }
895 Ok(None) => {
896 info!("Run not found: {}", id);
897 Json(json!({
898 "status": "not_found",
899 "error": format!("Run with id {} not found", id)
900 }))
901 }
902 Err(e) => {
903 error!("Failed to retrieve run: {}", e);
904 Json(json!({
905 "status": "error",
906 "error": e.to_string()
907 }))
908 }
909 }
910}
911
912#[expect(clippy::too_many_lines, reason = "TODO: Refactor later")]
913#[expect(
914 clippy::else_if_without_else,
915 reason = "There is `continue` or `return` in each branch, so `else` is redundant"
916)]
917async fn upload_files(
918 State(state): State<AppState>,
919 mut multipart: Multipart,
920) -> impl IntoResponse {
921 let storage_path = &state.storage_path;
922 info!("Received file upload request");
923
924 if let Err(e) = tokio::fs::create_dir_all(&storage_path).await {
925 error!(
926 "Failed to create storage directory at {}: {}",
927 storage_path.display(),
928 e
929 );
930 return Json(json!({
931 "status": "error",
932 "error": format!("Failed to create storage directory: {}", e)
933 }));
934 }
935
936 let mut files_processed = 0;
937 let mut total_bytes = 0u64;
938 let mut run_id: Option<String> = None;
939 let mut pending_paths: VecDeque<String> = VecDeque::new();
940 let mut pre_run_hooks: Option<Vec<models::HookOutput>> = None;
941 let mut post_run_hooks: Option<Vec<models::HookOutput>> = None;
942
943 while let Ok(Some(field)) = multipart.next_field().await {
944 let field_name = field.name().unwrap_or("unknown").to_string();
945
946 if field_name == "run_id" {
947 match field.text().await {
948 Ok(text) => {
949 run_id = Some(text);
950 continue;
951 }
952 Err(e) => {
953 error!("Failed to read run_id field: {}", e);
954 return Json(json!({
955 "status": "error",
956 "error": format!("Failed to read run_id: {}", e)
957 }));
958 }
959 }
960 } else if field_name == "path" {
961 match field.text().await {
962 Ok(text) => {
963 pending_paths.push_back(text);
964 continue;
965 }
966 Err(e) => {
967 error!("Failed to read path field: {}", e);
968 return Json(json!({
969 "status": "error",
970 "error": format!("Failed to read path: {}", e)
971 }));
972 }
973 }
974 } else if field_name == "pre_run" {
975 match field.text().await {
976 Ok(text) => match serde_json::from_str::<Vec<models::HookOutput>>(&text) {
977 Ok(hooks) => {
978 info!("Parsed {} pre-run hooks", hooks.len());
979 pre_run_hooks = Some(hooks);
980 continue;
981 }
982 Err(e) => {
983 error!("Failed to parse pre_run JSON: {}", e);
984 return Json(json!({
985 "status": "error",
986 "error": format!("Failed to parse pre_run JSON: {}", e)
987 }));
988 }
989 },
990 Err(e) => {
991 error!("Failed to read pre_run field: {}", e);
992 return Json(json!({
993 "status": "error",
994 "error": format!("Failed to read pre_run: {}", e)
995 }));
996 }
997 }
998 } else if field_name == "post_run" {
999 match field.text().await {
1000 Ok(text) => match serde_json::from_str::<Vec<models::HookOutput>>(&text) {
1001 Ok(hooks) => {
1002 info!("Parsed {} post-run hooks", hooks.len());
1003 post_run_hooks = Some(hooks);
1004 continue;
1005 }
1006 Err(e) => {
1007 error!("Failed to parse post_run JSON: {}", e);
1008 return Json(json!({
1009 "status": "error",
1010 "error": format!("Failed to parse post_run JSON: {}", e)
1011 }));
1012 }
1013 },
1014 Err(e) => {
1015 error!("Failed to read post_run field: {}", e);
1016 return Json(json!({
1017 "status": "error",
1018 "error": format!("Failed to read post_run: {}", e)
1019 }));
1020 }
1021 }
1022 }
1023
1024 let file_name = field.file_name().unwrap_or("unknown").to_string();
1025 let content_type = field
1026 .content_type()
1027 .unwrap_or("application/octet-stream")
1028 .to_string();
1029
1030 info!(
1031 "Processing file: field_name={}, file_name={}, content_type={}",
1032 field_name, file_name, content_type
1033 );
1034
1035 match field.bytes().await {
1036 Ok(data) => {
1037 let size = data.len();
1038 total_bytes += size as u64;
1039 let Ok(size_i64) = i64::try_from(size) else {
1040 error!("File too large to store size in database: {} bytes", size);
1041 return Json(json!({
1042 "status": "error",
1043 "error": "File too large to store size in database"
1044 }));
1045 };
1046
1047 let mut hasher = Sha256::new();
1048 hasher.update(&data);
1049 let hash = format!("{:x}", hasher.finalize());
1050
1051 info!("File hash: {}, size: {} bytes", hash, size);
1052
1053 let hash_dir = &hash[0..2];
1054 let file_storage_dir = storage_path.join(hash_dir);
1055 if let Err(e) = tokio::fs::create_dir_all(&file_storage_dir).await {
1056 error!(
1057 "Failed to create hash directory at {}: {}",
1058 file_storage_dir.display(),
1059 e
1060 );
1061 return Json(json!({
1062 "status": "error",
1063 "error": format!("Failed to create storage directory: {}", e)
1064 }));
1065 }
1066
1067 let file_storage_path = file_storage_dir.join(&hash);
1068 let storage_path_str = file_storage_path.to_string_lossy().to_string();
1069
1070 if file_storage_path.exists() {
1071 info!("File already exists (deduplicated): {}", storage_path_str);
1072 } else {
1073 if let Err(e) = tokio::fs::write(&file_storage_path, &data).await {
1074 error!("Failed to write file: {}", e);
1075 return Json(json!({
1076 "status": "error",
1077 "error": format!("Failed to write file: {}", e)
1078 }));
1079 }
1080 info!("Saved new file: {}", storage_path_str);
1081 }
1082
1083 if let Some(ref rid) = run_id {
1084 let relative_path = pending_paths
1085 .pop_front()
1086 .or_else(|| {
1087 if file_name == "unknown" {
1088 None
1089 } else {
1090 Some(file_name.clone())
1091 }
1092 })
1093 .unwrap_or_else(|| {
1094 if field_name != "unknown" && field_name != "file" {
1095 field_name.clone()
1096 } else {
1097 format!("file-{}", files_processed + 1)
1098 }
1099 });
1100
1101 let result = sqlx::query!(
1102 r#"
1103 INSERT INTO captured_files (run_id, path, size, hash, storage_path, content_type)
1104 VALUES ($1, $2, $3, $4, $5, $6)
1105 ON CONFLICT (run_id, path) DO UPDATE
1106 SET size = EXCLUDED.size,
1107 hash = EXCLUDED.hash,
1108 storage_path = EXCLUDED.storage_path,
1109 content_type = EXCLUDED.content_type
1110 "#,
1111 rid,
1112 relative_path,
1113 size_i64,
1114 hash,
1115 storage_path_str,
1116 content_type
1117 )
1118 .execute(&state.pool)
1119 .await;
1120
1121 match result {
1122 Ok(_) => {
1123 info!("Stored file metadata in database");
1124 }
1125 Err(e) => {
1126 error!("Failed to store file metadata: {}", e);
1127 return Json(json!({
1128 "status": "error",
1129 "error": format!("Failed to store file metadata: {}", e)
1130 }));
1131 }
1132 }
1133 }
1134
1135 files_processed += 1;
1136 info!("Successfully processed file: {} bytes", size);
1137 }
1138 Err(e) => {
1139 error!("Failed to read file field: {}", e);
1140 return Json(json!({
1141 "status": "error",
1142 "error": format!("Failed to read file: {}", e)
1143 }));
1144 }
1145 }
1146 }
1147
1148 info!(
1149 "Upload complete: {} files, {} bytes total",
1150 files_processed, total_bytes
1151 );
1152
1153 let mut pre_run_count = 0;
1155 let mut post_run_count = 0;
1156
1157 if let Some(ref rid) = run_id {
1158 if let Some(hooks) = pre_run_hooks {
1159 for hook in hooks {
1160 let result = sqlx::query!(
1161 r#"
1162 INSERT INTO run_outputs (run_id, phase, hook_id, config, output, success, error)
1163 VALUES ($1, 'pre', $2, $3, $4, $5, $6)
1164 "#,
1165 rid,
1166 hook.meta.id,
1167 hook.meta.config,
1168 hook.output,
1169 hook.meta.success,
1170 hook.meta.error
1171 )
1172 .execute(&state.pool)
1173 .await;
1174
1175 match result {
1176 Ok(_) => {
1177 pre_run_count += 1;
1178 info!("Stored pre-run hook: {}", hook.meta.id);
1179 }
1180 Err(e) => {
1181 error!("Failed to store pre-run hook {}: {}", hook.meta.id, e);
1182 return Json(json!({
1183 "status": "error",
1184 "error": format!("Failed to store pre-run hook {}: {}", hook.meta.id, e)
1185 }));
1186 }
1187 }
1188 }
1189 }
1190
1191 if let Some(hooks) = post_run_hooks {
1192 for hook in hooks {
1193 let result = sqlx::query!(
1194 r#"
1195 INSERT INTO run_outputs (run_id, phase, hook_id, config, output, success, error)
1196 VALUES ($1, 'post', $2, $3, $4, $5, $6)
1197 "#,
1198 rid,
1199 hook.meta.id,
1200 hook.meta.config,
1201 hook.output,
1202 hook.meta.success,
1203 hook.meta.error
1204 )
1205 .execute(&state.pool)
1206 .await;
1207
1208 match result {
1209 Ok(_) => {
1210 post_run_count += 1;
1211 info!("Stored post-run hook: {}", hook.meta.id);
1212 }
1213 Err(e) => {
1214 error!("Failed to store post-run hook {}: {}", hook.meta.id, e);
1215 return Json(json!({
1216 "status": "error",
1217 "error": format!("Failed to store post-run hook {}: {}", hook.meta.id, e)
1218 }));
1219 }
1220 }
1221 }
1222 }
1223 }
1224
1225 info!(
1226 "Hook outputs stored: {} pre-run, {} post-run",
1227 pre_run_count, post_run_count
1228 );
1229
1230 let response = capsula_api_types::UploadResponse {
1231 status: "ok".to_string(),
1232 files_processed,
1233 total_bytes,
1234 pre_run_hooks: pre_run_count,
1235 post_run_hooks: post_run_count,
1236 };
1237 Json(serde_json::to_value(response).expect("Failed to serialize UploadResponse"))
1238}
1239
1240async fn download_file(
1241 State(state): State<AppState>,
1242 Path((run_id, file_path)): Path<(String, String)>,
1243) -> Result<Response, StatusCode> {
1244 info!("Downloading file: run_id={}, path={}", run_id, file_path);
1245
1246 let file_record = sqlx::query!(
1248 r#"
1249 SELECT storage_path, content_type, path as file_path
1250 FROM captured_files
1251 WHERE run_id = $1 AND path = $2
1252 "#,
1253 run_id,
1254 file_path
1255 )
1256 .fetch_optional(&state.pool)
1257 .await
1258 .map_err(|e| {
1259 error!("Database error while fetching file metadata: {}", e);
1260 StatusCode::INTERNAL_SERVER_ERROR
1261 })?;
1262
1263 let Some(record) = file_record else {
1264 info!("File not found: run_id={}, path={}", run_id, file_path);
1265 return Err(StatusCode::NOT_FOUND);
1266 };
1267
1268 let file_data = tokio::fs::read(&record.storage_path).await.map_err(|e| {
1270 error!(
1271 "Failed to read file from storage {}: {}",
1272 record.storage_path, e
1273 );
1274 StatusCode::INTERNAL_SERVER_ERROR
1275 })?;
1276
1277 info!(
1278 "Successfully read file: {} bytes from {}",
1279 file_data.len(),
1280 record.storage_path
1281 );
1282
1283 let content_type = record.content_type.unwrap_or_else(|| {
1285 mime_guess::from_path(&record.file_path)
1286 .first_or_octet_stream()
1287 .to_string()
1288 });
1289
1290 let filename = std::path::Path::new(&record.file_path)
1292 .file_name()
1293 .and_then(|n| n.to_str())
1294 .unwrap_or("download");
1295
1296 Response::builder()
1298 .status(StatusCode::OK)
1299 .header(header::CONTENT_TYPE, content_type)
1300 .header(
1301 header::CONTENT_DISPOSITION,
1302 format!("inline; filename=\"{filename}\""),
1303 )
1304 .body(Body::from(file_data))
1305 .map_err(|e| {
1306 error!("Failed to build response: {}", e);
1307 StatusCode::INTERNAL_SERVER_ERROR
1308 })
1309}