1use askama::Template;
2use askama_web::WebTemplate;
3
4#[expect(
5 clippy::unnecessary_wraps,
6 reason = "Askama filter_fn macro generates code that triggers this lint, but Result return type is required by the Askama filter API"
7)]
8mod filters {
9 use askama::filter_fn;
10
11 #[filter_fn]
12 pub fn format_command(s: &str, _env: &dyn askama::Values) -> ::askama::Result<String> {
13 Ok(serde_json::from_str::<Vec<String>>(s).map_or_else(
15 |_| s.to_string(),
16 |cmd_array| {
17 shlex::try_join(cmd_array.iter().map(String::as_str))
18 .unwrap_or_else(|_| cmd_array.join(" "))
19 },
20 ))
21 }
22
23 #[filter_fn]
24 pub fn pretty_json<T: std::fmt::Display>(
25 s: T,
26 _env: &dyn askama::Values,
27 ) -> ::askama::Result<String> {
28 let s_str = s.to_string();
29 Ok(serde_json::from_str::<serde_json::Value>(&s_str)
31 .ok()
32 .and_then(|value| serde_json::to_string_pretty(&value).ok())
33 .unwrap_or(s_str))
34 }
35}
36
37use axum::{
38 Router,
39 body::Body,
40 extract::{DefaultBodyLimit, Multipart, Path, Query, State},
41 http::{StatusCode, header},
42 response::{IntoResponse, Json, Response},
43 routing::{get, post},
44};
45use capsula_api_types::VaultInfo;
46use serde_json::json;
47use sha2::{Digest, Sha256};
48use sqlx::PgPool;
49use std::collections::VecDeque;
50use std::path::PathBuf;
51use tower_http::services::ServeDir;
52use tracing::{error, info, warn};
53
54#[derive(Clone)]
55pub struct AppState {
56 pub pool: PgPool,
57 pub storage_path: PathBuf,
58}
59
60mod models;
61
62#[derive(Template, WebTemplate)]
63#[template(path = "index.html")]
64struct IndexTemplate;
65
66#[derive(Template, WebTemplate)]
67#[template(path = "vaults.html")]
68struct VaultsTemplate {
69 vaults: Vec<VaultInfo>,
70}
71
72#[derive(Template, WebTemplate)]
73#[template(path = "runs.html")]
74struct RunsTemplate {
75 runs: Vec<models::Run>,
76 vault: Option<String>,
77 page: i64,
78 total_pages: i64,
79}
80
81#[derive(Template, WebTemplate)]
82#[template(path = "run_detail.html")]
83struct RunDetailTemplate {
84 run: models::Run,
85 pre_run_hooks: Vec<models::HookOutput>,
86 post_run_hooks: Vec<models::HookOutput>,
87 files: Vec<models::CapturedFile>,
88}
89
90#[derive(Template, WebTemplate)]
91#[template(path = "error.html")]
92struct ErrorTemplate {
93 status_code: u16,
94 title: String,
95 message: String,
96}
97
98pub async fn create_pool(database_url: &str, max_connections: u32) -> Result<PgPool, sqlx::Error> {
99 sqlx::postgres::PgPoolOptions::new()
100 .max_connections(max_connections)
101 .acquire_timeout(std::time::Duration::from_secs(3))
102 .connect(database_url)
103 .await
104}
105
106pub fn build_app(pool: PgPool, storage_path: PathBuf, max_body_size: usize) -> Router {
107 let static_dir: PathBuf = std::env::var("CAPSULA_STATIC_DIR")
108 .expect("CAPSULA_STATIC_DIR environment variable must be set")
109 .into();
110
111 let state = AppState { pool, storage_path };
112
113 Router::new()
114 .route("/", get(index))
115 .route("/vaults", get(vaults_page))
116 .route("/runs", get(runs_page))
117 .route("/runs/{id}", get(run_detail_page))
118 .route("/health", get(health_check))
119 .route("/api/v1/vaults", get(list_vaults))
120 .route("/api/v1/vaults/{name}", get(get_vault_info))
121 .route("/api/v1/runs", post(create_run).get(list_runs))
122 .route("/api/v1/runs/{id}", get(get_run))
123 .route("/api/v1/runs/{id}/files/{*path}", get(download_file))
124 .route(
125 "/api/v1/upload",
126 post(upload_files).layer(DefaultBodyLimit::max(max_body_size)),
127 )
128 .nest_service("/static", ServeDir::new(static_dir))
129 .fallback(not_found)
130 .with_state(state)
131}
132
133async fn index() -> impl IntoResponse {
134 IndexTemplate
135}
136
137async fn not_found() -> impl IntoResponse {
138 (
139 StatusCode::NOT_FOUND,
140 ErrorTemplate {
141 status_code: 404,
142 title: "Page Not Found".to_string(),
143 message: "The page you are looking for does not exist.".to_string(),
144 },
145 )
146}
147
148async fn vaults_page(State(state): State<AppState>) -> impl IntoResponse {
149 info!("Rendering vaults page");
150
151 let result = sqlx::query!(
152 r#"
153 SELECT vault as name, COUNT(*) as "run_count!"
154 FROM runs
155 GROUP BY vault
156 ORDER BY vault
157 "#
158 )
159 .fetch_all(&state.pool)
160 .await;
161
162 match result {
163 Ok(rows) => {
164 let vaults: Vec<VaultInfo> = rows
165 .into_iter()
166 .map(|row| VaultInfo {
167 name: row.name,
168 run_count: row.run_count,
169 })
170 .collect();
171 VaultsTemplate { vaults }
172 }
173 Err(e) => {
174 error!("Failed to fetch vaults: {}", e);
175 VaultsTemplate { vaults: Vec::new() }
177 }
178 }
179}
180
181async fn runs_page(
182 State(state): State<AppState>,
183 Query(params): Query<models::ListRunsQuery>,
184) -> impl IntoResponse {
185 let page = params.offset.unwrap_or(0) / params.limit.unwrap_or(50) + 1;
186 let limit = params.limit.unwrap_or(50);
187 let offset = (page - 1) * limit;
188
189 info!(
190 "Rendering runs page: page={}, vault={:?}",
191 page, params.vault
192 );
193
194 let total_count = if let Some(ref vault) = params.vault {
196 sqlx::query_scalar!(
197 r#"
198 SELECT COUNT(*)::bigint
199 FROM runs
200 WHERE vault = $1
201 "#,
202 vault
203 )
204 .fetch_one(&state.pool)
205 .await
206 .unwrap_or(Some(0))
207 .unwrap_or(0)
208 } else {
209 sqlx::query_scalar!(
210 r#"
211 SELECT COUNT(*)::bigint
212 FROM runs
213 "#
214 )
215 .fetch_one(&state.pool)
216 .await
217 .unwrap_or(Some(0))
218 .unwrap_or(0)
219 };
220
221 let total_pages = (total_count + limit - 1) / limit;
222
223 let runs_result = if let Some(ref vault) = params.vault {
225 sqlx::query_as!(
226 models::Run,
227 r#"
228 SELECT id, name, timestamp, command, vault, project_root,
229 exit_code, duration_ms, stdout, stderr,
230 created_at, updated_at
231 FROM runs
232 WHERE vault = $1
233 ORDER BY timestamp DESC
234 LIMIT $2 OFFSET $3
235 "#,
236 vault,
237 limit,
238 offset
239 )
240 .fetch_all(&state.pool)
241 .await
242 } else {
243 sqlx::query_as!(
244 models::Run,
245 r#"
246 SELECT id, name, timestamp, command, vault, project_root,
247 exit_code, duration_ms, stdout, stderr,
248 created_at, updated_at
249 FROM runs
250 ORDER BY timestamp DESC
251 LIMIT $1 OFFSET $2
252 "#,
253 limit,
254 offset
255 )
256 .fetch_all(&state.pool)
257 .await
258 };
259
260 match runs_result {
261 Ok(runs) => RunsTemplate {
262 runs,
263 vault: params.vault,
264 page,
265 total_pages,
266 },
267 Err(e) => {
268 error!("Failed to fetch runs: {}", e);
269 RunsTemplate {
270 runs: Vec::new(),
271 vault: params.vault,
272 page: 1,
273 total_pages: 1,
274 }
275 }
276 }
277}
278
279async fn run_detail_page(
280 State(state): State<AppState>,
281 Path(id): Path<String>,
282) -> Result<RunDetailTemplate, StatusCode> {
283 info!("Rendering run detail page for: {}", id);
284
285 let run = sqlx::query_as!(
287 models::Run,
288 r#"
289 SELECT id, name, timestamp, command, vault, project_root,
290 exit_code, duration_ms, stdout, stderr,
291 created_at, updated_at
292 FROM runs
293 WHERE id = $1
294 "#,
295 id
296 )
297 .fetch_optional(&state.pool)
298 .await
299 .map_err(|e| {
300 error!("Database error while fetching run: {}", e);
301 StatusCode::INTERNAL_SERVER_ERROR
302 })?
303 .ok_or_else(|| {
304 info!("Run not found: {}", id);
305 StatusCode::NOT_FOUND
306 })?;
307
308 let hook_outputs_result = sqlx::query_as!(
310 models::RunOutputRow,
311 r#"
312 SELECT phase, hook_id, config, output, success, error
313 FROM run_outputs
314 WHERE run_id = $1
315 ORDER BY id
316 "#,
317 id
318 )
319 .fetch_all(&state.pool)
320 .await;
321
322 let (pre_run_hooks, post_run_hooks) = match hook_outputs_result {
323 Ok(rows) => {
324 let mut pre_hooks = Vec::new();
325 let mut post_hooks = Vec::new();
326
327 for row in rows {
328 let hook_output = models::HookOutput {
329 meta: models::HookMeta {
330 id: row.hook_id,
331 config: row.config,
332 success: row.success,
333 error: row.error,
334 },
335 output: row.output,
336 };
337
338 if row.phase == "pre" {
339 pre_hooks.push(hook_output);
340 } else if row.phase == "post" {
341 post_hooks.push(hook_output);
342 } else {
343 warn!("Unknown hook phase: {}", row.phase);
344 }
345 }
346
347 (pre_hooks, post_hooks)
348 }
349 Err(e) => {
350 error!("Failed to fetch hook outputs: {}", e);
351 (Vec::new(), Vec::new())
352 }
353 };
354
355 let files_result = sqlx::query_as!(
357 models::CapturedFile,
358 r#"
359 SELECT path, size, hash, storage_path, content_type
360 FROM captured_files
361 WHERE run_id = $1
362 ORDER BY path
363 "#,
364 id
365 )
366 .fetch_all(&state.pool)
367 .await;
368
369 let files = match files_result {
370 Ok(files) => files,
371 Err(e) => {
372 error!("Failed to fetch captured files: {}", e);
373 Vec::new()
374 }
375 };
376
377 Ok(RunDetailTemplate {
378 run,
379 pre_run_hooks,
380 post_run_hooks,
381 files,
382 })
383}
384
385async fn health_check(State(state): State<AppState>) -> impl IntoResponse {
386 match sqlx::query("SELECT 1").fetch_one(&state.pool).await {
387 Ok(_) => Json(json!({
388 "status": "ok",
389 "database": "connected"
390 })),
391 Err(e) => Json(json!({
392 "status": "error",
393 "database": "disconnected",
394 "error": e.to_string()
395 })),
396 }
397}
398
399async fn list_vaults(State(state): State<AppState>) -> impl IntoResponse {
400 info!("Listing all vaults");
401
402 let result = sqlx::query!(
403 r#"
404 SELECT vault as name, COUNT(*) as "run_count!"
405 FROM runs
406 GROUP BY vault
407 ORDER BY vault
408 "#
409 )
410 .fetch_all(&state.pool)
411 .await;
412
413 match result {
414 Ok(rows) => {
415 let vaults: Vec<VaultInfo> = rows
416 .into_iter()
417 .map(|row| VaultInfo {
418 name: row.name,
419 run_count: row.run_count,
420 })
421 .collect();
422 info!("Found {} vaults", vaults.len());
423 let response = capsula_api_types::VaultsResponse {
424 status: "ok".to_string(),
425 vaults,
426 };
427 Json(serde_json::to_value(response).expect("Failed to serialize VaultsResponse"))
428 }
429 Err(e) => {
430 error!("Failed to list vaults: {}", e);
431 let response = capsula_api_types::ErrorResponse {
432 status: "error".to_string(),
433 error: e.to_string(),
434 };
435 Json(serde_json::to_value(response).expect("Failed to serialize ErrorResponse"))
436 }
437 }
438}
439
440async fn get_vault_info(
441 State(state): State<AppState>,
442 Path(name): Path<String>,
443) -> impl IntoResponse {
444 info!("Getting vault info: {}", name);
445
446 let result = sqlx::query!(
447 r#"
448 SELECT vault as name, COUNT(*) as "run_count!"
449 FROM runs
450 WHERE vault = $1
451 GROUP BY vault
452 "#,
453 name
454 )
455 .fetch_optional(&state.pool)
456 .await;
457
458 match result {
459 Ok(Some(row)) => {
460 let vault = VaultInfo {
461 name: row.name,
462 run_count: row.run_count,
463 };
464 info!("Found vault: {} with {} runs", vault.name, vault.run_count);
465 let response = capsula_api_types::VaultExistsResponse {
466 status: "ok".to_string(),
467 exists: true,
468 vault: Some(vault),
469 };
470 Json(serde_json::to_value(response).expect("Failed to serialize VaultExistsResponse"))
471 }
472 Ok(None) => {
473 info!("Vault not found: {}", name);
474 let response = capsula_api_types::VaultExistsResponse {
475 status: "ok".to_string(),
476 exists: false,
477 vault: None,
478 };
479 Json(serde_json::to_value(response).expect("Failed to serialize VaultExistsResponse"))
480 }
481 Err(e) => {
482 error!("Failed to get vault info: {}", e);
483 let response = capsula_api_types::ErrorResponse {
484 status: "error".to_string(),
485 error: e.to_string(),
486 };
487 Json(serde_json::to_value(response).expect("Failed to serialize ErrorResponse"))
488 }
489 }
490}
491
492async fn list_runs(
493 State(state): State<AppState>,
494 Query(params): Query<models::ListRunsQuery>,
495) -> impl IntoResponse {
496 let limit = params.limit.unwrap_or(100);
497 let offset = params.offset.unwrap_or(0);
498
499 if let Some(ref vault) = params.vault {
500 info!(
501 "Listing runs for vault: {} (limit={}, offset={})",
502 vault, limit, offset
503 );
504 } else {
505 info!("Listing all runs (limit={}, offset={})", limit, offset);
506 }
507
508 let result = if let Some(vault) = params.vault {
509 sqlx::query_as!(
510 models::Run,
511 r#"
512 SELECT id, name, timestamp, command, vault, project_root,
513 exit_code, duration_ms, stdout, stderr,
514 created_at, updated_at
515 FROM runs
516 WHERE vault = $1
517 ORDER BY timestamp DESC
518 LIMIT $2 OFFSET $3
519 "#,
520 vault,
521 limit,
522 offset
523 )
524 .fetch_all(&state.pool)
525 .await
526 } else {
527 sqlx::query_as!(
528 models::Run,
529 r#"
530 SELECT id, name, timestamp, command, vault, project_root,
531 exit_code, duration_ms, stdout, stderr,
532 created_at, updated_at
533 FROM runs
534 ORDER BY timestamp DESC
535 LIMIT $1 OFFSET $2
536 "#,
537 limit,
538 offset
539 )
540 .fetch_all(&state.pool)
541 .await
542 };
543
544 match result {
545 Ok(runs) => {
546 info!("Found {} runs", runs.len());
547 Json(json!({
548 "status": "ok",
549 "runs": runs,
550 "limit": limit,
551 "offset": offset
552 }))
553 }
554 Err(e) => {
555 error!("Failed to list runs: {}", e);
556 Json(json!({
557 "status": "error",
558 "error": e.to_string()
559 }))
560 }
561 }
562}
563
564async fn create_run(
565 State(state): State<AppState>,
566 Json(request): Json<models::CreateRunRequest>,
567) -> impl IntoResponse {
568 info!("Creating run: {}", request.id);
569
570 let result = sqlx::query!(
571 r#"
572 INSERT INTO runs (
573 id, name, timestamp, command, vault, project_root,
574 exit_code, duration_ms, stdout, stderr
575 ) VALUES (
576 $1, $2, $3, $4, $5, $6,
577 $7, $8, $9, $10
578 )
579 "#,
580 request.id,
581 request.name,
582 request.timestamp,
583 request.command,
584 request.vault,
585 request.project_root,
586 request.exit_code,
587 request.duration_ms,
588 request.stdout,
589 request.stderr
590 )
591 .execute(&state.pool)
592 .await;
593
594 match result {
595 Ok(_) => {
596 info!("Run created successfully");
597 (
598 StatusCode::CREATED,
599 Json(json!({
600 "status": "created",
601 "run": request
602 })),
603 )
604 }
605 Err(e) => {
606 error!("Failed to insert run: {}", e);
607 let status = if e.to_string().contains("duplicate key") {
608 StatusCode::CONFLICT
609 } else {
610 StatusCode::INTERNAL_SERVER_ERROR
611 };
612 (
613 status,
614 Json(json!({
615 "status": "error",
616 "error": e.to_string()
617 })),
618 )
619 }
620 }
621}
622
623async fn get_run(State(state): State<AppState>, Path(id): Path<String>) -> impl IntoResponse {
624 info!("Getting run: {}", id);
625
626 let result = sqlx::query_as!(
627 models::Run,
628 r#"
629 SELECT id, name, timestamp, command, vault, project_root,
630 exit_code, duration_ms, stdout, stderr,
631 created_at, updated_at
632 FROM runs
633 WHERE id = $1
634 "#,
635 id
636 )
637 .fetch_optional(&state.pool)
638 .await;
639
640 match result {
641 Ok(Some(run)) => {
642 info!("Found run: {}", run.id);
643
644 let hook_outputs_result = sqlx::query_as!(
646 models::RunOutputRow,
647 r#"
648 SELECT phase, hook_id, config, output, success, error
649 FROM run_outputs
650 WHERE run_id = $1
651 ORDER BY id
652 "#,
653 id
654 )
655 .fetch_all(&state.pool)
656 .await;
657
658 let (pre_run_hooks, post_run_hooks) = match hook_outputs_result {
659 Ok(rows) => {
660 let mut pre_hooks = Vec::new();
661 let mut post_hooks = Vec::new();
662
663 for row in rows {
664 let hook_output = models::HookOutput {
665 meta: models::HookMeta {
666 id: row.hook_id,
667 config: row.config,
668 success: row.success,
669 error: row.error,
670 },
671 output: row.output,
672 };
673
674 if row.phase == "pre" {
675 pre_hooks.push(hook_output);
676 } else if row.phase == "post" {
677 post_hooks.push(hook_output);
678 } else {
679 warn!("Unknown hook phase: {}", row.phase);
680 }
681 }
682
683 info!(
684 "Found {} pre-run hooks and {} post-run hooks",
685 pre_hooks.len(),
686 post_hooks.len()
687 );
688 (pre_hooks, post_hooks)
689 }
690 Err(e) => {
691 error!("Failed to fetch hook outputs: {}", e);
692 (Vec::new(), Vec::new())
693 }
694 };
695
696 Json(json!({
697 "status": "ok",
698 "run": run,
699 "pre_run_hooks": pre_run_hooks,
700 "post_run_hooks": post_run_hooks
701 }))
702 }
703 Ok(None) => {
704 info!("Run not found: {}", id);
705 Json(json!({
706 "status": "not_found",
707 "error": format!("Run with id {} not found", id)
708 }))
709 }
710 Err(e) => {
711 error!("Failed to retrieve run: {}", e);
712 Json(json!({
713 "status": "error",
714 "error": e.to_string()
715 }))
716 }
717 }
718}
719
720#[expect(clippy::too_many_lines, reason = "TODO: Refactor later")]
721#[expect(
722 clippy::else_if_without_else,
723 reason = "There is `continue` or `return` in each branch, so `else` is redundant"
724)]
725async fn upload_files(
726 State(state): State<AppState>,
727 mut multipart: Multipart,
728) -> impl IntoResponse {
729 let storage_path = &state.storage_path;
730 info!("Received file upload request");
731
732 if let Err(e) = tokio::fs::create_dir_all(&storage_path).await {
733 error!(
734 "Failed to create storage directory at {}: {}",
735 storage_path.display(),
736 e
737 );
738 return Json(json!({
739 "status": "error",
740 "error": format!("Failed to create storage directory: {}", e)
741 }));
742 }
743
744 let mut files_processed = 0;
745 let mut total_bytes = 0u64;
746 let mut run_id: Option<String> = None;
747 let mut pending_paths: VecDeque<String> = VecDeque::new();
748 let mut pre_run_hooks: Option<Vec<models::HookOutput>> = None;
749 let mut post_run_hooks: Option<Vec<models::HookOutput>> = None;
750
751 while let Ok(Some(field)) = multipart.next_field().await {
752 let field_name = field.name().unwrap_or("unknown").to_string();
753
754 if field_name == "run_id" {
755 match field.text().await {
756 Ok(text) => {
757 run_id = Some(text);
758 continue;
759 }
760 Err(e) => {
761 error!("Failed to read run_id field: {}", e);
762 return Json(json!({
763 "status": "error",
764 "error": format!("Failed to read run_id: {}", e)
765 }));
766 }
767 }
768 } else if field_name == "path" {
769 match field.text().await {
770 Ok(text) => {
771 pending_paths.push_back(text);
772 continue;
773 }
774 Err(e) => {
775 error!("Failed to read path field: {}", e);
776 return Json(json!({
777 "status": "error",
778 "error": format!("Failed to read path: {}", e)
779 }));
780 }
781 }
782 } else if field_name == "pre_run" {
783 match field.text().await {
784 Ok(text) => match serde_json::from_str::<Vec<models::HookOutput>>(&text) {
785 Ok(hooks) => {
786 info!("Parsed {} pre-run hooks", hooks.len());
787 pre_run_hooks = Some(hooks);
788 continue;
789 }
790 Err(e) => {
791 error!("Failed to parse pre_run JSON: {}", e);
792 return Json(json!({
793 "status": "error",
794 "error": format!("Failed to parse pre_run JSON: {}", e)
795 }));
796 }
797 },
798 Err(e) => {
799 error!("Failed to read pre_run field: {}", e);
800 return Json(json!({
801 "status": "error",
802 "error": format!("Failed to read pre_run: {}", e)
803 }));
804 }
805 }
806 } else if field_name == "post_run" {
807 match field.text().await {
808 Ok(text) => match serde_json::from_str::<Vec<models::HookOutput>>(&text) {
809 Ok(hooks) => {
810 info!("Parsed {} post-run hooks", hooks.len());
811 post_run_hooks = Some(hooks);
812 continue;
813 }
814 Err(e) => {
815 error!("Failed to parse post_run JSON: {}", e);
816 return Json(json!({
817 "status": "error",
818 "error": format!("Failed to parse post_run JSON: {}", e)
819 }));
820 }
821 },
822 Err(e) => {
823 error!("Failed to read post_run field: {}", e);
824 return Json(json!({
825 "status": "error",
826 "error": format!("Failed to read post_run: {}", e)
827 }));
828 }
829 }
830 }
831
832 let file_name = field.file_name().unwrap_or("unknown").to_string();
833 let content_type = field
834 .content_type()
835 .unwrap_or("application/octet-stream")
836 .to_string();
837
838 info!(
839 "Processing file: field_name={}, file_name={}, content_type={}",
840 field_name, file_name, content_type
841 );
842
843 match field.bytes().await {
844 Ok(data) => {
845 let size = data.len();
846 total_bytes += size as u64;
847 let Ok(size_i64) = i64::try_from(size) else {
848 error!("File too large to store size in database: {} bytes", size);
849 return Json(json!({
850 "status": "error",
851 "error": "File too large to store size in database"
852 }));
853 };
854
855 let mut hasher = Sha256::new();
856 hasher.update(&data);
857 let hash = format!("{:x}", hasher.finalize());
858
859 info!("File hash: {}, size: {} bytes", hash, size);
860
861 let hash_dir = &hash[0..2];
862 let file_storage_dir = storage_path.join(hash_dir);
863 if let Err(e) = tokio::fs::create_dir_all(&file_storage_dir).await {
864 error!(
865 "Failed to create hash directory at {}: {}",
866 file_storage_dir.display(),
867 e
868 );
869 return Json(json!({
870 "status": "error",
871 "error": format!("Failed to create storage directory: {}", e)
872 }));
873 }
874
875 let file_storage_path = file_storage_dir.join(&hash);
876 let storage_path_str = file_storage_path.to_string_lossy().to_string();
877
878 if file_storage_path.exists() {
879 info!("File already exists (deduplicated): {}", storage_path_str);
880 } else {
881 if let Err(e) = tokio::fs::write(&file_storage_path, &data).await {
882 error!("Failed to write file: {}", e);
883 return Json(json!({
884 "status": "error",
885 "error": format!("Failed to write file: {}", e)
886 }));
887 }
888 info!("Saved new file: {}", storage_path_str);
889 }
890
891 if let Some(ref rid) = run_id {
892 let relative_path = pending_paths
893 .pop_front()
894 .or_else(|| {
895 if file_name == "unknown" {
896 None
897 } else {
898 Some(file_name.clone())
899 }
900 })
901 .unwrap_or_else(|| {
902 if field_name != "unknown" && field_name != "file" {
903 field_name.clone()
904 } else {
905 format!("file-{}", files_processed + 1)
906 }
907 });
908
909 let result = sqlx::query!(
910 r#"
911 INSERT INTO captured_files (run_id, path, size, hash, storage_path, content_type)
912 VALUES ($1, $2, $3, $4, $5, $6)
913 ON CONFLICT (run_id, path) DO UPDATE
914 SET size = EXCLUDED.size,
915 hash = EXCLUDED.hash,
916 storage_path = EXCLUDED.storage_path,
917 content_type = EXCLUDED.content_type
918 "#,
919 rid,
920 relative_path,
921 size_i64,
922 hash,
923 storage_path_str,
924 content_type
925 )
926 .execute(&state.pool)
927 .await;
928
929 match result {
930 Ok(_) => {
931 info!("Stored file metadata in database");
932 }
933 Err(e) => {
934 error!("Failed to store file metadata: {}", e);
935 return Json(json!({
936 "status": "error",
937 "error": format!("Failed to store file metadata: {}", e)
938 }));
939 }
940 }
941 }
942
943 files_processed += 1;
944 info!("Successfully processed file: {} bytes", size);
945 }
946 Err(e) => {
947 error!("Failed to read file field: {}", e);
948 return Json(json!({
949 "status": "error",
950 "error": format!("Failed to read file: {}", e)
951 }));
952 }
953 }
954 }
955
956 info!(
957 "Upload complete: {} files, {} bytes total",
958 files_processed, total_bytes
959 );
960
961 let mut pre_run_count = 0;
963 let mut post_run_count = 0;
964
965 if let Some(ref rid) = run_id {
966 if let Some(hooks) = pre_run_hooks {
967 for hook in hooks {
968 let result = sqlx::query!(
969 r#"
970 INSERT INTO run_outputs (run_id, phase, hook_id, config, output, success, error)
971 VALUES ($1, 'pre', $2, $3, $4, $5, $6)
972 "#,
973 rid,
974 hook.meta.id,
975 hook.meta.config,
976 hook.output,
977 hook.meta.success,
978 hook.meta.error
979 )
980 .execute(&state.pool)
981 .await;
982
983 match result {
984 Ok(_) => {
985 pre_run_count += 1;
986 info!("Stored pre-run hook: {}", hook.meta.id);
987 }
988 Err(e) => {
989 error!("Failed to store pre-run hook {}: {}", hook.meta.id, e);
990 return Json(json!({
991 "status": "error",
992 "error": format!("Failed to store pre-run hook {}: {}", hook.meta.id, e)
993 }));
994 }
995 }
996 }
997 }
998
999 if let Some(hooks) = post_run_hooks {
1000 for hook in hooks {
1001 let result = sqlx::query!(
1002 r#"
1003 INSERT INTO run_outputs (run_id, phase, hook_id, config, output, success, error)
1004 VALUES ($1, 'post', $2, $3, $4, $5, $6)
1005 "#,
1006 rid,
1007 hook.meta.id,
1008 hook.meta.config,
1009 hook.output,
1010 hook.meta.success,
1011 hook.meta.error
1012 )
1013 .execute(&state.pool)
1014 .await;
1015
1016 match result {
1017 Ok(_) => {
1018 post_run_count += 1;
1019 info!("Stored post-run hook: {}", hook.meta.id);
1020 }
1021 Err(e) => {
1022 error!("Failed to store post-run hook {}: {}", hook.meta.id, e);
1023 return Json(json!({
1024 "status": "error",
1025 "error": format!("Failed to store post-run hook {}: {}", hook.meta.id, e)
1026 }));
1027 }
1028 }
1029 }
1030 }
1031 }
1032
1033 info!(
1034 "Hook outputs stored: {} pre-run, {} post-run",
1035 pre_run_count, post_run_count
1036 );
1037
1038 let response = capsula_api_types::UploadResponse {
1039 status: "ok".to_string(),
1040 files_processed,
1041 total_bytes,
1042 pre_run_hooks: pre_run_count,
1043 post_run_hooks: post_run_count,
1044 };
1045 Json(serde_json::to_value(response).expect("Failed to serialize UploadResponse"))
1046}
1047
1048async fn download_file(
1049 State(state): State<AppState>,
1050 Path((run_id, file_path)): Path<(String, String)>,
1051) -> Result<Response, StatusCode> {
1052 info!("Downloading file: run_id={}, path={}", run_id, file_path);
1053
1054 let file_record = sqlx::query!(
1056 r#"
1057 SELECT storage_path, content_type, path as file_path
1058 FROM captured_files
1059 WHERE run_id = $1 AND path = $2
1060 "#,
1061 run_id,
1062 file_path
1063 )
1064 .fetch_optional(&state.pool)
1065 .await
1066 .map_err(|e| {
1067 error!("Database error while fetching file metadata: {}", e);
1068 StatusCode::INTERNAL_SERVER_ERROR
1069 })?;
1070
1071 let Some(record) = file_record else {
1072 info!("File not found: run_id={}, path={}", run_id, file_path);
1073 return Err(StatusCode::NOT_FOUND);
1074 };
1075
1076 let file_data = tokio::fs::read(&record.storage_path).await.map_err(|e| {
1078 error!(
1079 "Failed to read file from storage {}: {}",
1080 record.storage_path, e
1081 );
1082 StatusCode::INTERNAL_SERVER_ERROR
1083 })?;
1084
1085 info!(
1086 "Successfully read file: {} bytes from {}",
1087 file_data.len(),
1088 record.storage_path
1089 );
1090
1091 let content_type = record.content_type.unwrap_or_else(|| {
1093 mime_guess::from_path(&record.file_path)
1094 .first_or_octet_stream()
1095 .to_string()
1096 });
1097
1098 let filename = std::path::Path::new(&record.file_path)
1100 .file_name()
1101 .and_then(|n| n.to_str())
1102 .unwrap_or("download");
1103
1104 Response::builder()
1106 .status(StatusCode::OK)
1107 .header(header::CONTENT_TYPE, content_type)
1108 .header(
1109 header::CONTENT_DISPOSITION,
1110 format!("inline; filename=\"{filename}\""),
1111 )
1112 .body(Body::from(file_data))
1113 .map_err(|e| {
1114 error!("Failed to build response: {}", e);
1115 StatusCode::INTERNAL_SERVER_ERROR
1116 })
1117}