Skip to main content

mini_apm_admin/api/
ingest.rs

1use axum::{Extension, Json, extract::State, http::StatusCode};
2use serde::Deserialize;
3
4use mini_apm::{
5    DbPool,
6    models::{deploy, error as app_error, span},
7};
8use crate::api::auth::ProjectContext;
9
10#[derive(Debug, Deserialize)]
11pub struct IncomingErrorBatch {
12    pub errors: Vec<app_error::IncomingError>,
13}
14
15pub async fn ingest_spans(
16    State(pool): State<DbPool>,
17    Extension(ctx): Extension<ProjectContext>,
18    Json(otlp_request): Json<span::OtlpTraceRequest>,
19) -> StatusCode {
20    match span::insert_otlp_batch(&pool, &otlp_request, ctx.project_id) {
21        Ok(count) => {
22            tracing::debug!("Ingested {} spans (project_id={:?})", count, ctx.project_id);
23            StatusCode::ACCEPTED
24        }
25        Err(e) => {
26            tracing::error!("Failed to ingest spans: {}", e);
27            StatusCode::INTERNAL_SERVER_ERROR
28        }
29    }
30}
31
32pub async fn ingest_deploys(
33    State(pool): State<DbPool>,
34    Extension(ctx): Extension<ProjectContext>,
35    Json(incoming): Json<deploy::IncomingDeploy>,
36) -> StatusCode {
37    match deploy::insert(&pool, &incoming, ctx.project_id) {
38        Ok(id) => {
39            tracing::info!(
40                "Recorded deploy id={} git_sha={} (project_id={:?})",
41                id,
42                incoming.git_sha,
43                ctx.project_id
44            );
45            StatusCode::ACCEPTED
46        }
47        Err(e) => {
48            tracing::error!("Failed to record deploy: {}", e);
49            StatusCode::INTERNAL_SERVER_ERROR
50        }
51    }
52}
53
54pub async fn ingest_errors(
55    State(pool): State<DbPool>,
56    Extension(ctx): Extension<ProjectContext>,
57    Json(incoming): Json<app_error::IncomingError>,
58) -> StatusCode {
59    match app_error::insert(&pool, &incoming, ctx.project_id) {
60        Ok(id) => {
61            tracing::debug!(
62                "Recorded error id={} class={} (project_id={:?})",
63                id,
64                incoming.exception_class,
65                ctx.project_id
66            );
67            StatusCode::ACCEPTED
68        }
69        Err(e) => {
70            tracing::error!("Failed to record error: {}", e);
71            StatusCode::INTERNAL_SERVER_ERROR
72        }
73    }
74}
75
76pub async fn ingest_errors_batch(
77    State(pool): State<DbPool>,
78    Extension(ctx): Extension<ProjectContext>,
79    Json(batch): Json<IncomingErrorBatch>,
80) -> StatusCode {
81    let mut success_count = 0;
82    let mut error_count = 0;
83
84    for error in batch.errors {
85        match app_error::insert(&pool, &error, ctx.project_id) {
86            Ok(_) => success_count += 1,
87            Err(e) => {
88                tracing::warn!("Failed to record error: {}", e);
89                error_count += 1;
90            }
91        }
92    }
93
94    tracing::debug!(
95        "Ingested {} errors, {} failed (project_id={:?})",
96        success_count,
97        error_count,
98        ctx.project_id
99    );
100
101    if error_count > 0 && success_count == 0 {
102        StatusCode::INTERNAL_SERVER_ERROR
103    } else {
104        StatusCode::ACCEPTED
105    }
106}