use axum::{http::StatusCode, response::IntoResponse, Json};
use crate::types::ErrorResponse;
pub(crate) struct TimeoutElapsed;
pub(crate) type SearchOutcome =
Result<velesdb_core::Result<Vec<velesdb_core::SearchResult>>, axum::response::Response>;
pub(crate) type TimedSearchOutcome = Result<SearchOutcome, TimeoutElapsed>;
#[allow(clippy::result_large_err)]
pub(crate) async fn run_search_with_optional_timeout<F>(
timeout_ms: Option<u64>,
work: F,
) -> TimedSearchOutcome
where
F: FnOnce() -> SearchOutcome + Send + 'static,
{
if matches!(timeout_ms, Some(0)) {
return Err(TimeoutElapsed);
}
let handle = tokio::task::spawn_blocking(work);
match timeout_ms {
Some(ms) => await_with_timeout(handle, ms).await,
None => Ok(unwrap_join(handle.await)),
}
}
#[allow(clippy::result_large_err)]
async fn await_with_timeout(
handle: tokio::task::JoinHandle<SearchOutcome>,
ms: u64,
) -> TimedSearchOutcome {
let duration = std::time::Duration::from_millis(ms);
match tokio::time::timeout(duration, handle).await {
Ok(join_result) => Ok(unwrap_join(join_result)),
Err(_elapsed) => Err(TimeoutElapsed),
}
}
#[allow(clippy::result_large_err)]
pub(crate) async fn run_blocking_search<F>(work: F) -> SearchOutcome
where
F: FnOnce() -> SearchOutcome + Send + 'static,
{
unwrap_join(tokio::task::spawn_blocking(work).await)
}
#[allow(clippy::result_large_err)]
fn unwrap_join(join_result: Result<SearchOutcome, tokio::task::JoinError>) -> SearchOutcome {
match join_result {
Ok(inner) => inner,
Err(join_err) => Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: format!("Search worker task failed: {join_err}"),
code: Some("VELES-INTERNAL-WORKER-FAILURE".to_string()),
}),
)
.into_response()),
}
}