1use askama::Template;
2use askama_web::WebTemplate;
3
4#[expect(
5 clippy::unnecessary_wraps,
6 reason = "Askama filter_fn macro generates code that triggers this lint, but Result return type is required by the Askama filter API"
7)]
8mod filters {
9 use askama::filter_fn;
10
11 #[filter_fn]
12 pub fn format_command(s: &str, _env: &dyn askama::Values) -> ::askama::Result<String> {
13 Ok(serde_json::from_str::<Vec<String>>(s).map_or_else(
15 |_| s.to_string(),
16 |cmd_array| {
17 shlex::try_join(cmd_array.iter().map(String::as_str))
18 .unwrap_or_else(|_| cmd_array.join(" "))
19 },
20 ))
21 }
22
23 #[filter_fn]
24 pub fn pretty_json<T: std::fmt::Display>(
25 s: T,
26 _env: &dyn askama::Values,
27 ) -> ::askama::Result<String> {
28 let s_str = s.to_string();
29 Ok(serde_json::from_str::<serde_json::Value>(&s_str)
31 .ok()
32 .and_then(|value| serde_json::to_string_pretty(&value).ok())
33 .unwrap_or(s_str))
34 }
35}
36
37use axum::{
38 Router,
39 body::Body,
40 extract::{Multipart, Path, Query, State},
41 http::{StatusCode, header},
42 response::{IntoResponse, Json, Response},
43 routing::{get, post},
44};
45use capsula_api_types::VaultInfo;
46use serde_json::json;
47use sha2::{Digest, Sha256};
48use sqlx::PgPool;
49use std::collections::VecDeque;
50use std::path::PathBuf;
51use tower_http::services::ServeDir;
52use tracing::{error, info, warn};
53
54#[derive(Clone)]
55pub struct AppState {
56 pub pool: PgPool,
57 pub storage_path: PathBuf,
58}
59
60mod models;
61
62#[derive(Template, WebTemplate)]
63#[template(path = "index.html")]
64struct IndexTemplate;
65
66#[derive(Template, WebTemplate)]
67#[template(path = "vaults.html")]
68struct VaultsTemplate {
69 vaults: Vec<VaultInfo>,
70}
71
72#[derive(Template, WebTemplate)]
73#[template(path = "runs.html")]
74struct RunsTemplate {
75 runs: Vec<models::Run>,
76 vault: Option<String>,
77 page: i64,
78 total_pages: i64,
79}
80
81#[derive(Template, WebTemplate)]
82#[template(path = "run_detail.html")]
83struct RunDetailTemplate {
84 run: models::Run,
85 pre_run_hooks: Vec<models::HookOutput>,
86 post_run_hooks: Vec<models::HookOutput>,
87 files: Vec<models::CapturedFile>,
88}
89
90#[derive(Template, WebTemplate)]
91#[template(path = "error.html")]
92struct ErrorTemplate {
93 status_code: u16,
94 title: String,
95 message: String,
96}
97
98pub async fn create_pool(database_url: &str, max_connections: u32) -> Result<PgPool, sqlx::Error> {
99 sqlx::postgres::PgPoolOptions::new()
100 .max_connections(max_connections)
101 .acquire_timeout(std::time::Duration::from_secs(3))
102 .connect(database_url)
103 .await
104}
105
106pub fn build_app(pool: PgPool, storage_path: PathBuf) -> Router {
107 let static_dir: PathBuf = std::env::var("CAPSULA_STATIC_DIR")
108 .expect("CAPSULA_STATIC_DIR environment variable must be set")
109 .into();
110
111 let state = AppState { pool, storage_path };
112
113 Router::new()
114 .route("/", get(index))
115 .route("/vaults", get(vaults_page))
116 .route("/runs", get(runs_page))
117 .route("/runs/{id}", get(run_detail_page))
118 .route("/health", get(health_check))
119 .route("/api/v1/vaults", get(list_vaults))
120 .route("/api/v1/vaults/{name}", get(get_vault_info))
121 .route("/api/v1/runs", post(create_run).get(list_runs))
122 .route("/api/v1/runs/{id}", get(get_run))
123 .route("/api/v1/runs/{id}/files/{*path}", get(download_file))
124 .route("/api/v1/upload", post(upload_files))
125 .nest_service("/static", ServeDir::new(static_dir))
126 .fallback(not_found)
127 .with_state(state)
128}
129
130async fn index() -> impl IntoResponse {
131 IndexTemplate
132}
133
134async fn not_found() -> impl IntoResponse {
135 (
136 StatusCode::NOT_FOUND,
137 ErrorTemplate {
138 status_code: 404,
139 title: "Page Not Found".to_string(),
140 message: "The page you are looking for does not exist.".to_string(),
141 },
142 )
143}
144
145async fn vaults_page(State(state): State<AppState>) -> impl IntoResponse {
146 info!("Rendering vaults page");
147
148 let result = sqlx::query!(
149 r#"
150 SELECT vault as name, COUNT(*) as "run_count!"
151 FROM runs
152 GROUP BY vault
153 ORDER BY vault
154 "#
155 )
156 .fetch_all(&state.pool)
157 .await;
158
159 match result {
160 Ok(rows) => {
161 let vaults: Vec<VaultInfo> = rows
162 .into_iter()
163 .map(|row| VaultInfo {
164 name: row.name,
165 run_count: row.run_count,
166 })
167 .collect();
168 VaultsTemplate { vaults }
169 }
170 Err(e) => {
171 error!("Failed to fetch vaults: {}", e);
172 VaultsTemplate { vaults: Vec::new() }
174 }
175 }
176}
177
178async fn runs_page(
179 State(state): State<AppState>,
180 Query(params): Query<models::ListRunsQuery>,
181) -> impl IntoResponse {
182 let page = params.offset.unwrap_or(0) / params.limit.unwrap_or(50) + 1;
183 let limit = params.limit.unwrap_or(50);
184 let offset = (page - 1) * limit;
185
186 info!(
187 "Rendering runs page: page={}, vault={:?}",
188 page, params.vault
189 );
190
191 let total_count = if let Some(ref vault) = params.vault {
193 sqlx::query_scalar!(
194 r#"
195 SELECT COUNT(*)::bigint
196 FROM runs
197 WHERE vault = $1
198 "#,
199 vault
200 )
201 .fetch_one(&state.pool)
202 .await
203 .unwrap_or(Some(0))
204 .unwrap_or(0)
205 } else {
206 sqlx::query_scalar!(
207 r#"
208 SELECT COUNT(*)::bigint
209 FROM runs
210 "#
211 )
212 .fetch_one(&state.pool)
213 .await
214 .unwrap_or(Some(0))
215 .unwrap_or(0)
216 };
217
218 let total_pages = (total_count + limit - 1) / limit;
219
220 let runs_result = if let Some(ref vault) = params.vault {
222 sqlx::query_as!(
223 models::Run,
224 r#"
225 SELECT id, name, timestamp, command, vault, project_root,
226 exit_code, duration_ms, stdout, stderr,
227 created_at, updated_at
228 FROM runs
229 WHERE vault = $1
230 ORDER BY timestamp DESC
231 LIMIT $2 OFFSET $3
232 "#,
233 vault,
234 limit,
235 offset
236 )
237 .fetch_all(&state.pool)
238 .await
239 } else {
240 sqlx::query_as!(
241 models::Run,
242 r#"
243 SELECT id, name, timestamp, command, vault, project_root,
244 exit_code, duration_ms, stdout, stderr,
245 created_at, updated_at
246 FROM runs
247 ORDER BY timestamp DESC
248 LIMIT $1 OFFSET $2
249 "#,
250 limit,
251 offset
252 )
253 .fetch_all(&state.pool)
254 .await
255 };
256
257 match runs_result {
258 Ok(runs) => RunsTemplate {
259 runs,
260 vault: params.vault,
261 page,
262 total_pages,
263 },
264 Err(e) => {
265 error!("Failed to fetch runs: {}", e);
266 RunsTemplate {
267 runs: Vec::new(),
268 vault: params.vault,
269 page: 1,
270 total_pages: 1,
271 }
272 }
273 }
274}
275
276async fn run_detail_page(
277 State(state): State<AppState>,
278 Path(id): Path<String>,
279) -> Result<RunDetailTemplate, StatusCode> {
280 info!("Rendering run detail page for: {}", id);
281
282 let run = sqlx::query_as!(
284 models::Run,
285 r#"
286 SELECT id, name, timestamp, command, vault, project_root,
287 exit_code, duration_ms, stdout, stderr,
288 created_at, updated_at
289 FROM runs
290 WHERE id = $1
291 "#,
292 id
293 )
294 .fetch_optional(&state.pool)
295 .await
296 .map_err(|e| {
297 error!("Database error while fetching run: {}", e);
298 StatusCode::INTERNAL_SERVER_ERROR
299 })?
300 .ok_or_else(|| {
301 info!("Run not found: {}", id);
302 StatusCode::NOT_FOUND
303 })?;
304
305 let hook_outputs_result = sqlx::query_as!(
307 models::RunOutputRow,
308 r#"
309 SELECT phase, hook_id, config, output, success, error
310 FROM run_outputs
311 WHERE run_id = $1
312 ORDER BY id
313 "#,
314 id
315 )
316 .fetch_all(&state.pool)
317 .await;
318
319 let (pre_run_hooks, post_run_hooks) = match hook_outputs_result {
320 Ok(rows) => {
321 let mut pre_hooks = Vec::new();
322 let mut post_hooks = Vec::new();
323
324 for row in rows {
325 let hook_output = models::HookOutput {
326 meta: models::HookMeta {
327 id: row.hook_id,
328 config: row.config,
329 success: row.success,
330 error: row.error,
331 },
332 output: row.output,
333 };
334
335 if row.phase == "pre" {
336 pre_hooks.push(hook_output);
337 } else if row.phase == "post" {
338 post_hooks.push(hook_output);
339 } else {
340 warn!("Unknown hook phase: {}", row.phase);
341 }
342 }
343
344 (pre_hooks, post_hooks)
345 }
346 Err(e) => {
347 error!("Failed to fetch hook outputs: {}", e);
348 (Vec::new(), Vec::new())
349 }
350 };
351
352 let files_result = sqlx::query_as!(
354 models::CapturedFile,
355 r#"
356 SELECT path, size, hash, storage_path, content_type
357 FROM captured_files
358 WHERE run_id = $1
359 ORDER BY path
360 "#,
361 id
362 )
363 .fetch_all(&state.pool)
364 .await;
365
366 let files = match files_result {
367 Ok(files) => files,
368 Err(e) => {
369 error!("Failed to fetch captured files: {}", e);
370 Vec::new()
371 }
372 };
373
374 Ok(RunDetailTemplate {
375 run,
376 pre_run_hooks,
377 post_run_hooks,
378 files,
379 })
380}
381
382async fn health_check(State(state): State<AppState>) -> impl IntoResponse {
383 match sqlx::query("SELECT 1").fetch_one(&state.pool).await {
384 Ok(_) => Json(json!({
385 "status": "ok",
386 "database": "connected"
387 })),
388 Err(e) => Json(json!({
389 "status": "error",
390 "database": "disconnected",
391 "error": e.to_string()
392 })),
393 }
394}
395
396async fn list_vaults(State(state): State<AppState>) -> impl IntoResponse {
397 info!("Listing all vaults");
398
399 let result = sqlx::query!(
400 r#"
401 SELECT vault as name, COUNT(*) as "run_count!"
402 FROM runs
403 GROUP BY vault
404 ORDER BY vault
405 "#
406 )
407 .fetch_all(&state.pool)
408 .await;
409
410 match result {
411 Ok(rows) => {
412 let vaults: Vec<VaultInfo> = rows
413 .into_iter()
414 .map(|row| VaultInfo {
415 name: row.name,
416 run_count: row.run_count,
417 })
418 .collect();
419 info!("Found {} vaults", vaults.len());
420 let response = capsula_api_types::VaultsResponse {
421 status: "ok".to_string(),
422 vaults,
423 };
424 Json(serde_json::to_value(response).expect("Failed to serialize VaultsResponse"))
425 }
426 Err(e) => {
427 error!("Failed to list vaults: {}", e);
428 let response = capsula_api_types::ErrorResponse {
429 status: "error".to_string(),
430 error: e.to_string(),
431 };
432 Json(serde_json::to_value(response).expect("Failed to serialize ErrorResponse"))
433 }
434 }
435}
436
437async fn get_vault_info(
438 State(state): State<AppState>,
439 Path(name): Path<String>,
440) -> impl IntoResponse {
441 info!("Getting vault info: {}", name);
442
443 let result = sqlx::query!(
444 r#"
445 SELECT vault as name, COUNT(*) as "run_count!"
446 FROM runs
447 WHERE vault = $1
448 GROUP BY vault
449 "#,
450 name
451 )
452 .fetch_optional(&state.pool)
453 .await;
454
455 match result {
456 Ok(Some(row)) => {
457 let vault = VaultInfo {
458 name: row.name,
459 run_count: row.run_count,
460 };
461 info!("Found vault: {} with {} runs", vault.name, vault.run_count);
462 let response = capsula_api_types::VaultExistsResponse {
463 status: "ok".to_string(),
464 exists: true,
465 vault: Some(vault),
466 };
467 Json(serde_json::to_value(response).expect("Failed to serialize VaultExistsResponse"))
468 }
469 Ok(None) => {
470 info!("Vault not found: {}", name);
471 let response = capsula_api_types::VaultExistsResponse {
472 status: "ok".to_string(),
473 exists: false,
474 vault: None,
475 };
476 Json(serde_json::to_value(response).expect("Failed to serialize VaultExistsResponse"))
477 }
478 Err(e) => {
479 error!("Failed to get vault info: {}", e);
480 let response = capsula_api_types::ErrorResponse {
481 status: "error".to_string(),
482 error: e.to_string(),
483 };
484 Json(serde_json::to_value(response).expect("Failed to serialize ErrorResponse"))
485 }
486 }
487}
488
489async fn list_runs(
490 State(state): State<AppState>,
491 Query(params): Query<models::ListRunsQuery>,
492) -> impl IntoResponse {
493 let limit = params.limit.unwrap_or(100);
494 let offset = params.offset.unwrap_or(0);
495
496 if let Some(ref vault) = params.vault {
497 info!(
498 "Listing runs for vault: {} (limit={}, offset={})",
499 vault, limit, offset
500 );
501 } else {
502 info!("Listing all runs (limit={}, offset={})", limit, offset);
503 }
504
505 let result = if let Some(vault) = params.vault {
506 sqlx::query_as!(
507 models::Run,
508 r#"
509 SELECT id, name, timestamp, command, vault, project_root,
510 exit_code, duration_ms, stdout, stderr,
511 created_at, updated_at
512 FROM runs
513 WHERE vault = $1
514 ORDER BY timestamp DESC
515 LIMIT $2 OFFSET $3
516 "#,
517 vault,
518 limit,
519 offset
520 )
521 .fetch_all(&state.pool)
522 .await
523 } else {
524 sqlx::query_as!(
525 models::Run,
526 r#"
527 SELECT id, name, timestamp, command, vault, project_root,
528 exit_code, duration_ms, stdout, stderr,
529 created_at, updated_at
530 FROM runs
531 ORDER BY timestamp DESC
532 LIMIT $1 OFFSET $2
533 "#,
534 limit,
535 offset
536 )
537 .fetch_all(&state.pool)
538 .await
539 };
540
541 match result {
542 Ok(runs) => {
543 info!("Found {} runs", runs.len());
544 Json(json!({
545 "status": "ok",
546 "runs": runs,
547 "limit": limit,
548 "offset": offset
549 }))
550 }
551 Err(e) => {
552 error!("Failed to list runs: {}", e);
553 Json(json!({
554 "status": "error",
555 "error": e.to_string()
556 }))
557 }
558 }
559}
560
561async fn create_run(
562 State(state): State<AppState>,
563 Json(request): Json<models::CreateRunRequest>,
564) -> impl IntoResponse {
565 info!("Creating run: {}", request.id);
566
567 let result = sqlx::query!(
568 r#"
569 INSERT INTO runs (
570 id, name, timestamp, command, vault, project_root,
571 exit_code, duration_ms, stdout, stderr
572 ) VALUES (
573 $1, $2, $3, $4, $5, $6,
574 $7, $8, $9, $10
575 )
576 "#,
577 request.id,
578 request.name,
579 request.timestamp,
580 request.command,
581 request.vault,
582 request.project_root,
583 request.exit_code,
584 request.duration_ms,
585 request.stdout,
586 request.stderr
587 )
588 .execute(&state.pool)
589 .await;
590
591 match result {
592 Ok(_) => {
593 info!("Run created successfully");
594 (
595 StatusCode::CREATED,
596 Json(json!({
597 "status": "created",
598 "run": request
599 })),
600 )
601 }
602 Err(e) => {
603 error!("Failed to insert run: {}", e);
604 let status = if e.to_string().contains("duplicate key") {
605 StatusCode::CONFLICT
606 } else {
607 StatusCode::INTERNAL_SERVER_ERROR
608 };
609 (
610 status,
611 Json(json!({
612 "status": "error",
613 "error": e.to_string()
614 })),
615 )
616 }
617 }
618}
619
620async fn get_run(State(state): State<AppState>, Path(id): Path<String>) -> impl IntoResponse {
621 info!("Getting run: {}", id);
622
623 let result = sqlx::query_as!(
624 models::Run,
625 r#"
626 SELECT id, name, timestamp, command, vault, project_root,
627 exit_code, duration_ms, stdout, stderr,
628 created_at, updated_at
629 FROM runs
630 WHERE id = $1
631 "#,
632 id
633 )
634 .fetch_optional(&state.pool)
635 .await;
636
637 match result {
638 Ok(Some(run)) => {
639 info!("Found run: {}", run.id);
640
641 let hook_outputs_result = sqlx::query_as!(
643 models::RunOutputRow,
644 r#"
645 SELECT phase, hook_id, config, output, success, error
646 FROM run_outputs
647 WHERE run_id = $1
648 ORDER BY id
649 "#,
650 id
651 )
652 .fetch_all(&state.pool)
653 .await;
654
655 let (pre_run_hooks, post_run_hooks) = match hook_outputs_result {
656 Ok(rows) => {
657 let mut pre_hooks = Vec::new();
658 let mut post_hooks = Vec::new();
659
660 for row in rows {
661 let hook_output = models::HookOutput {
662 meta: models::HookMeta {
663 id: row.hook_id,
664 config: row.config,
665 success: row.success,
666 error: row.error,
667 },
668 output: row.output,
669 };
670
671 if row.phase == "pre" {
672 pre_hooks.push(hook_output);
673 } else if row.phase == "post" {
674 post_hooks.push(hook_output);
675 } else {
676 warn!("Unknown hook phase: {}", row.phase);
677 }
678 }
679
680 info!(
681 "Found {} pre-run hooks and {} post-run hooks",
682 pre_hooks.len(),
683 post_hooks.len()
684 );
685 (pre_hooks, post_hooks)
686 }
687 Err(e) => {
688 error!("Failed to fetch hook outputs: {}", e);
689 (Vec::new(), Vec::new())
690 }
691 };
692
693 Json(json!({
694 "status": "ok",
695 "run": run,
696 "pre_run_hooks": pre_run_hooks,
697 "post_run_hooks": post_run_hooks
698 }))
699 }
700 Ok(None) => {
701 info!("Run not found: {}", id);
702 Json(json!({
703 "status": "not_found",
704 "error": format!("Run with id {} not found", id)
705 }))
706 }
707 Err(e) => {
708 error!("Failed to retrieve run: {}", e);
709 Json(json!({
710 "status": "error",
711 "error": e.to_string()
712 }))
713 }
714 }
715}
716
717#[expect(clippy::too_many_lines, reason = "TODO: Refactor later")]
718#[expect(
719 clippy::else_if_without_else,
720 reason = "There is `continue` or `return` in each branch, so `else` is redundant"
721)]
722async fn upload_files(
723 State(state): State<AppState>,
724 mut multipart: Multipart,
725) -> impl IntoResponse {
726 let storage_path = &state.storage_path;
727 info!("Received file upload request");
728
729 if let Err(e) = tokio::fs::create_dir_all(&storage_path).await {
730 error!("Failed to create storage directory: {}", e);
731 return Json(json!({
732 "status": "error",
733 "error": format!("Failed to create storage directory: {}", e)
734 }));
735 }
736
737 let mut files_processed = 0;
738 let mut total_bytes = 0u64;
739 let mut run_id: Option<String> = None;
740 let mut pending_paths: VecDeque<String> = VecDeque::new();
741 let mut pre_run_hooks: Option<Vec<models::HookOutput>> = None;
742 let mut post_run_hooks: Option<Vec<models::HookOutput>> = None;
743
744 while let Ok(Some(field)) = multipart.next_field().await {
745 let field_name = field.name().unwrap_or("unknown").to_string();
746
747 if field_name == "run_id" {
748 match field.text().await {
749 Ok(text) => {
750 run_id = Some(text);
751 continue;
752 }
753 Err(e) => {
754 error!("Failed to read run_id field: {}", e);
755 return Json(json!({
756 "status": "error",
757 "error": format!("Failed to read run_id: {}", e)
758 }));
759 }
760 }
761 } else if field_name == "path" {
762 match field.text().await {
763 Ok(text) => {
764 pending_paths.push_back(text);
765 continue;
766 }
767 Err(e) => {
768 error!("Failed to read path field: {}", e);
769 return Json(json!({
770 "status": "error",
771 "error": format!("Failed to read path: {}", e)
772 }));
773 }
774 }
775 } else if field_name == "pre_run" {
776 match field.text().await {
777 Ok(text) => match serde_json::from_str::<Vec<models::HookOutput>>(&text) {
778 Ok(hooks) => {
779 info!("Parsed {} pre-run hooks", hooks.len());
780 pre_run_hooks = Some(hooks);
781 continue;
782 }
783 Err(e) => {
784 error!("Failed to parse pre_run JSON: {}", e);
785 return Json(json!({
786 "status": "error",
787 "error": format!("Failed to parse pre_run JSON: {}", e)
788 }));
789 }
790 },
791 Err(e) => {
792 error!("Failed to read pre_run field: {}", e);
793 return Json(json!({
794 "status": "error",
795 "error": format!("Failed to read pre_run: {}", e)
796 }));
797 }
798 }
799 } else if field_name == "post_run" {
800 match field.text().await {
801 Ok(text) => match serde_json::from_str::<Vec<models::HookOutput>>(&text) {
802 Ok(hooks) => {
803 info!("Parsed {} post-run hooks", hooks.len());
804 post_run_hooks = Some(hooks);
805 continue;
806 }
807 Err(e) => {
808 error!("Failed to parse post_run JSON: {}", e);
809 return Json(json!({
810 "status": "error",
811 "error": format!("Failed to parse post_run JSON: {}", e)
812 }));
813 }
814 },
815 Err(e) => {
816 error!("Failed to read post_run field: {}", e);
817 return Json(json!({
818 "status": "error",
819 "error": format!("Failed to read post_run: {}", e)
820 }));
821 }
822 }
823 }
824
825 let file_name = field.file_name().unwrap_or("unknown").to_string();
826 let content_type = field
827 .content_type()
828 .unwrap_or("application/octet-stream")
829 .to_string();
830
831 info!(
832 "Processing file: field_name={}, file_name={}, content_type={}",
833 field_name, file_name, content_type
834 );
835
836 match field.bytes().await {
837 Ok(data) => {
838 let size = data.len();
839 total_bytes += size as u64;
840 let Ok(size_i64) = i64::try_from(size) else {
841 error!("File too large to store size in database: {} bytes", size);
842 return Json(json!({
843 "status": "error",
844 "error": "File too large to store size in database"
845 }));
846 };
847
848 let mut hasher = Sha256::new();
849 hasher.update(&data);
850 let hash = format!("{:x}", hasher.finalize());
851
852 info!("File hash: {}, size: {} bytes", hash, size);
853
854 let hash_dir = &hash[0..2];
855 let file_storage_dir = storage_path.join(hash_dir);
856 if let Err(e) = tokio::fs::create_dir_all(&file_storage_dir).await {
857 error!("Failed to create hash directory: {}", e);
858 return Json(json!({
859 "status": "error",
860 "error": format!("Failed to create storage directory: {}", e)
861 }));
862 }
863
864 let file_storage_path = file_storage_dir.join(&hash);
865 let storage_path_str = file_storage_path.to_string_lossy().to_string();
866
867 if file_storage_path.exists() {
868 info!("File already exists (deduplicated): {}", storage_path_str);
869 } else {
870 if let Err(e) = tokio::fs::write(&file_storage_path, &data).await {
871 error!("Failed to write file: {}", e);
872 return Json(json!({
873 "status": "error",
874 "error": format!("Failed to write file: {}", e)
875 }));
876 }
877 info!("Saved new file: {}", storage_path_str);
878 }
879
880 if let Some(ref rid) = run_id {
881 let relative_path = pending_paths
882 .pop_front()
883 .or_else(|| {
884 if file_name == "unknown" {
885 None
886 } else {
887 Some(file_name.clone())
888 }
889 })
890 .unwrap_or_else(|| {
891 if field_name != "unknown" && field_name != "file" {
892 field_name.clone()
893 } else {
894 format!("file-{}", files_processed + 1)
895 }
896 });
897
898 let result = sqlx::query!(
899 r#"
900 INSERT INTO captured_files (run_id, path, size, hash, storage_path, content_type)
901 VALUES ($1, $2, $3, $4, $5, $6)
902 ON CONFLICT (run_id, path) DO UPDATE
903 SET size = EXCLUDED.size,
904 hash = EXCLUDED.hash,
905 storage_path = EXCLUDED.storage_path,
906 content_type = EXCLUDED.content_type
907 "#,
908 rid,
909 relative_path,
910 size_i64,
911 hash,
912 storage_path_str,
913 content_type
914 )
915 .execute(&state.pool)
916 .await;
917
918 match result {
919 Ok(_) => {
920 info!("Stored file metadata in database");
921 }
922 Err(e) => {
923 error!("Failed to store file metadata: {}", e);
924 return Json(json!({
925 "status": "error",
926 "error": format!("Failed to store file metadata: {}", e)
927 }));
928 }
929 }
930 }
931
932 files_processed += 1;
933 info!("Successfully processed file: {} bytes", size);
934 }
935 Err(e) => {
936 error!("Failed to read file field: {}", e);
937 return Json(json!({
938 "status": "error",
939 "error": format!("Failed to read file: {}", e)
940 }));
941 }
942 }
943 }
944
945 info!(
946 "Upload complete: {} files, {} bytes total",
947 files_processed, total_bytes
948 );
949
950 let mut pre_run_count = 0;
952 let mut post_run_count = 0;
953
954 if let Some(ref rid) = run_id {
955 if let Some(hooks) = pre_run_hooks {
956 for hook in hooks {
957 let result = sqlx::query!(
958 r#"
959 INSERT INTO run_outputs (run_id, phase, hook_id, config, output, success, error)
960 VALUES ($1, 'pre', $2, $3, $4, $5, $6)
961 "#,
962 rid,
963 hook.meta.id,
964 hook.meta.config,
965 hook.output,
966 hook.meta.success,
967 hook.meta.error
968 )
969 .execute(&state.pool)
970 .await;
971
972 match result {
973 Ok(_) => {
974 pre_run_count += 1;
975 info!("Stored pre-run hook: {}", hook.meta.id);
976 }
977 Err(e) => {
978 error!("Failed to store pre-run hook {}: {}", hook.meta.id, e);
979 return Json(json!({
980 "status": "error",
981 "error": format!("Failed to store pre-run hook {}: {}", hook.meta.id, e)
982 }));
983 }
984 }
985 }
986 }
987
988 if let Some(hooks) = post_run_hooks {
989 for hook in hooks {
990 let result = sqlx::query!(
991 r#"
992 INSERT INTO run_outputs (run_id, phase, hook_id, config, output, success, error)
993 VALUES ($1, 'post', $2, $3, $4, $5, $6)
994 "#,
995 rid,
996 hook.meta.id,
997 hook.meta.config,
998 hook.output,
999 hook.meta.success,
1000 hook.meta.error
1001 )
1002 .execute(&state.pool)
1003 .await;
1004
1005 match result {
1006 Ok(_) => {
1007 post_run_count += 1;
1008 info!("Stored post-run hook: {}", hook.meta.id);
1009 }
1010 Err(e) => {
1011 error!("Failed to store post-run hook {}: {}", hook.meta.id, e);
1012 return Json(json!({
1013 "status": "error",
1014 "error": format!("Failed to store post-run hook {}: {}", hook.meta.id, e)
1015 }));
1016 }
1017 }
1018 }
1019 }
1020 }
1021
1022 info!(
1023 "Hook outputs stored: {} pre-run, {} post-run",
1024 pre_run_count, post_run_count
1025 );
1026
1027 let response = capsula_api_types::UploadResponse {
1028 status: "ok".to_string(),
1029 files_processed,
1030 total_bytes,
1031 pre_run_hooks: pre_run_count,
1032 post_run_hooks: post_run_count,
1033 };
1034 Json(serde_json::to_value(response).expect("Failed to serialize UploadResponse"))
1035}
1036
1037async fn download_file(
1038 State(state): State<AppState>,
1039 Path((run_id, file_path)): Path<(String, String)>,
1040) -> Result<Response, StatusCode> {
1041 info!("Downloading file: run_id={}, path={}", run_id, file_path);
1042
1043 let file_record = sqlx::query!(
1045 r#"
1046 SELECT storage_path, content_type, path as file_path
1047 FROM captured_files
1048 WHERE run_id = $1 AND path = $2
1049 "#,
1050 run_id,
1051 file_path
1052 )
1053 .fetch_optional(&state.pool)
1054 .await
1055 .map_err(|e| {
1056 error!("Database error while fetching file metadata: {}", e);
1057 StatusCode::INTERNAL_SERVER_ERROR
1058 })?;
1059
1060 let Some(record) = file_record else {
1061 info!("File not found: run_id={}, path={}", run_id, file_path);
1062 return Err(StatusCode::NOT_FOUND);
1063 };
1064
1065 let file_data = tokio::fs::read(&record.storage_path).await.map_err(|e| {
1067 error!(
1068 "Failed to read file from storage {}: {}",
1069 record.storage_path, e
1070 );
1071 StatusCode::INTERNAL_SERVER_ERROR
1072 })?;
1073
1074 info!(
1075 "Successfully read file: {} bytes from {}",
1076 file_data.len(),
1077 record.storage_path
1078 );
1079
1080 let content_type = record.content_type.unwrap_or_else(|| {
1082 mime_guess::from_path(&record.file_path)
1083 .first_or_octet_stream()
1084 .to_string()
1085 });
1086
1087 let filename = std::path::Path::new(&record.file_path)
1089 .file_name()
1090 .and_then(|n| n.to_str())
1091 .unwrap_or("download");
1092
1093 Response::builder()
1095 .status(StatusCode::OK)
1096 .header(header::CONTENT_TYPE, content_type)
1097 .header(
1098 header::CONTENT_DISPOSITION,
1099 format!("inline; filename=\"{filename}\""),
1100 )
1101 .body(Body::from(file_data))
1102 .map_err(|e| {
1103 error!("Failed to build response: {}", e);
1104 StatusCode::INTERNAL_SERVER_ERROR
1105 })
1106}