use nodedb_types::DatabaseId;
use pgwire::api::results::{Response, Tag};
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use crate::types::TraceId;
use nodedb_physical::physical_plan::MetaOp;
pub async fn handle_reindex(
state: &SharedState,
identity: &AuthenticatedIdentity,
collection: &str,
index_name: Option<&str>,
concurrent: bool,
) -> PgWireResult<Vec<Response>> {
let collection = collection.to_lowercase();
let index_name = index_name.map(str::to_lowercase);
let tenant_id = identity.tenant_id;
if let Some(catalog) = state.credentials.catalog()
&& catalog
.get_collection(DatabaseId::DEFAULT, tenant_id.as_u64(), &collection)
.ok()
.flatten()
.is_none()
{
return Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"42P01".to_owned(),
format!("collection \"{collection}\" does not exist"),
))));
}
if concurrent {
let plan = crate::bridge::envelope::PhysicalPlan::Meta(MetaOp::RebuildIndex {
collection: collection.clone(),
index_name,
concurrent: true,
});
let trace_id = TraceId::generate();
crate::control::server::broadcast::broadcast_register_to_all_cores(
state, tenant_id, plan, trace_id,
)
.await
.map_err(|e| {
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"XX000".to_owned(),
format!("REINDEX CONCURRENTLY failed: {e}"),
)))
})?;
tracing::info!(
%collection,
concurrent = true,
"REINDEX CONCURRENTLY dispatched and acknowledged by all cores"
);
} else {
super::distributed::dispatch_maintenance_to_all_cores(state, tenant_id, MetaOp::Checkpoint);
tracing::info!(%collection, concurrent = false, "REINDEX dispatched");
}
Ok(vec![Response::Execution(Tag::new("REINDEX"))])
}