use std::time::{Duration, Instant};
use axum::extract::{Path, State};
use axum::http::HeaderMap;
use axum::response::IntoResponse;
use crate::bridge::envelope::{PhysicalPlan, Priority, Request, Status};
use crate::bridge::physical_plan::DocumentOp;
use crate::control::server::http::auth::{ApiError, AppState, resolve_identity};
use crate::types::{ReadConsistency, RequestId, TenantId, VShardId};
pub(super) fn extract_request_id(headers: &HeaderMap) -> u64 {
headers
.get("x-request-id")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or_else(|| {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64
})
}
fn collection_vshard(collection: &str) -> VShardId {
VShardId::from_collection(collection)
}
pub(super) async fn dispatch_plan(
state: &AppState,
tenant_id: TenantId,
collection: &str,
plan: PhysicalPlan,
) -> Result<Vec<u8>, ApiError> {
dispatch_plan_with_trace(state, tenant_id, collection, plan, 0).await
}
pub(super) async fn dispatch_plan_with_trace(
state: &AppState,
tenant_id: TenantId,
collection: &str,
plan: PhysicalPlan,
trace_id: u64,
) -> Result<Vec<u8>, ApiError> {
let vshard_id = collection_vshard(collection);
let request_id = RequestId::new(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64,
);
let request = Request {
request_id,
tenant_id,
vshard_id,
plan,
deadline: Instant::now()
+ Duration::from_secs(state.shared.tuning.network.default_deadline_secs),
priority: Priority::Normal,
trace_id,
consistency: ReadConsistency::Strong,
idempotency_key: None,
};
let rx = state.shared.tracker.register_oneshot(request_id);
match state.shared.dispatcher.lock() {
Ok(mut d) => d
.dispatch(request)
.map_err(|e| ApiError::Internal(e.to_string()))?,
Err(p) => p
.into_inner()
.dispatch(request)
.map_err(|e| ApiError::Internal(e.to_string()))?,
};
let resp = tokio::time::timeout(
Duration::from_secs(state.shared.tuning.network.default_deadline_secs),
rx,
)
.await
.map_err(|_| ApiError::Internal("request timed out".into()))?
.map_err(|_| ApiError::Internal("response channel closed".into()))?;
if resp.status != Status::Ok {
let detail = if let Some(ref code) = resp.error_code {
format!("{code:?}")
} else {
String::from_utf8_lossy(&resp.payload).into_owned()
};
return Err(ApiError::Internal(detail));
}
Ok(resp.payload.to_vec())
}
pub async fn insert_document(
headers: HeaderMap,
State(state): State<AppState>,
Path(collection): Path<String>,
axum::Json(body): axum::Json<serde_json::Value>,
) -> Result<impl IntoResponse, ApiError> {
let identity = resolve_identity(&headers, &state, "http")?;
state
.shared
.check_tenant_quota(identity.tenant_id)
.map_err(|e| ApiError::BadRequest(e.to_string()))?;
let document_id = match body.get("id").and_then(|v| v.as_str()) {
Some(id) => id.to_string(),
None => nodedb_types::id_gen::uuid_v7(),
};
let data = body
.get("data")
.ok_or_else(|| ApiError::BadRequest("missing 'data' field".into()))?;
if let Some(catalog) = state.shared.credentials.catalog()
&& let Ok(Some(coll)) = catalog.get_collection(identity.tenant_id.as_u32(), &collection)
&& let Err(e) = crate::control::server::pgwire::ddl::collection::validate_document_schema(
&coll.fields,
data,
)
{
return Err(ApiError::BadRequest(format!("schema validation: {e}")));
}
let value =
serde_json::to_vec(data).map_err(|e| ApiError::BadRequest(format!("invalid data: {e}")))?;
let plan = PhysicalPlan::Document(DocumentOp::PointPut {
collection: collection.clone(),
document_id: document_id.clone(),
value,
});
let trace_id = extract_request_id(&headers);
state.shared.tenant_request_start(identity.tenant_id);
let result =
dispatch_plan_with_trace(&state, identity.tenant_id, &collection, plan, trace_id).await;
state.shared.tenant_request_end(identity.tenant_id);
result?;
Ok(axum::Json(serde_json::json!({
"status": "ok",
"id": document_id,
"collection": collection,
})))
}
pub async fn get_document(
headers: HeaderMap,
State(state): State<AppState>,
Path((collection, document_id)): Path<(String, String)>,
) -> Result<impl IntoResponse, ApiError> {
let identity = resolve_identity(&headers, &state, "http")?;
let plan = PhysicalPlan::Document(DocumentOp::PointGet {
collection: collection.clone(),
document_id: document_id.clone(),
rls_filters: Vec::new(),
});
state.shared.tenant_request_start(identity.tenant_id);
let result = dispatch_plan(&state, identity.tenant_id, &collection, plan).await;
state.shared.tenant_request_end(identity.tenant_id);
let payload = result?;
if payload.is_empty() {
return Err(ApiError::BadRequest(format!(
"document '{document_id}' not found in '{collection}'"
)));
}
let data: serde_json::Value = serde_json::from_slice(&payload).unwrap_or_else(|_| {
serde_json::Value::String(String::from_utf8_lossy(&payload).into_owned())
});
Ok(axum::Json(serde_json::json!({
"status": "ok",
"id": document_id,
"collection": collection,
"data": data,
})))
}
pub async fn delete_document(
headers: HeaderMap,
State(state): State<AppState>,
Path((collection, document_id)): Path<(String, String)>,
) -> Result<impl IntoResponse, ApiError> {
let identity = resolve_identity(&headers, &state, "http")?;
let plan = PhysicalPlan::Document(DocumentOp::PointDelete {
collection: collection.clone(),
document_id: document_id.clone(),
});
state.shared.tenant_request_start(identity.tenant_id);
let result = dispatch_plan(&state, identity.tenant_id, &collection, plan).await;
state.shared.tenant_request_end(identity.tenant_id);
result?;
Ok(axum::Json(serde_json::json!({
"status": "ok",
"id": document_id,
"collection": collection,
"deleted": true,
})))
}