use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
Json,
};
use serde::Deserialize;
use crate::error::AppResult;
use crate::services::result_store::{parse_noetl_ref, PutResultBody, ResultStoreService};
#[derive(Clone)]
pub struct ResultStoreDeps {
pub service: ResultStoreService,
}
pub async fn put_result(
State(deps): State<ResultStoreDeps>,
Path(execution_id): Path<i64>,
Json(body): Json<PutResultBody>,
) -> AppResult<impl IntoResponse> {
let span = tracing::info_span!(
"result_store.put",
execution_id,
name = %body.name,
scope = %body.scope,
);
let _g = span.enter();
let t0 = std::time::Instant::now();
let result = deps.service.put(execution_id, &body).await;
let elapsed = t0.elapsed().as_secs_f64();
match result {
Ok(resp) => {
tracing::info!(
execution_id,
name = %body.name,
bytes = resp.bytes,
result_ref = %resp.r#ref,
duration_seconds = elapsed,
"result_store.put: stored",
);
crate::metrics::record_result_store_put(elapsed, resp.bytes as usize, "ok");
Ok((StatusCode::OK, Json(resp)))
}
Err(e) => {
tracing::warn!(
execution_id,
name = %body.name,
error = %e,
duration_seconds = elapsed,
"result_store.put: failed",
);
crate::metrics::record_result_store_put(elapsed, 0, "error");
Err(e)
}
}
}
#[derive(Debug, Deserialize)]
pub struct ResolveQuery {
pub r#ref: String,
}
pub async fn resolve_ref(
State(deps): State<ResultStoreDeps>,
Query(params): Query<ResolveQuery>,
) -> AppResult<impl IntoResponse> {
let span = tracing::info_span!(
"result_store.resolve",
noetl_ref = %params.r#ref,
);
let _g = span.enter();
let t0 = std::time::Instant::now();
let parsed = match parse_noetl_ref(¶ms.r#ref) {
Ok(r) => r,
Err(msg) => {
tracing::warn!(
noetl_ref = %params.r#ref,
error = %msg,
"result_store.resolve: malformed URI",
);
crate::metrics::record_result_store_resolve(t0.elapsed().as_secs_f64(), "bad_request");
return Ok((
StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": msg})),
)
.into_response());
}
};
let execution_id = parsed.execution_id;
let name = parsed.name.clone();
let result = deps.service.resolve(&parsed).await;
let elapsed = t0.elapsed().as_secs_f64();
match result {
Ok(Some(data)) => {
tracing::info!(
execution_id,
name = %name,
duration_seconds = elapsed,
"result_store.resolve: found",
);
crate::metrics::record_result_store_resolve(elapsed, "ok");
Ok((StatusCode::OK, Json(data)).into_response())
}
Ok(None) => {
tracing::warn!(
execution_id,
name = %name,
noetl_ref = %params.r#ref,
"result_store.resolve: not found",
);
crate::metrics::record_result_store_resolve(elapsed, "not_found");
Ok((
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": "result not found"})),
)
.into_response())
}
Err(e) => {
tracing::warn!(
execution_id,
name = %name,
error = %e,
duration_seconds = elapsed,
"result_store.resolve: error",
);
crate::metrics::record_result_store_resolve(elapsed, "error");
Err(e)
}
}
}