use std::time::Duration;
use crate::bridge::envelope::PhysicalPlan;
use crate::control::state::SharedState;
use crate::types::{DatabaseId, TenantId, TraceId};
use nodedb_physical::physical_plan::DocumentOp;
use nodedb_cluster::rpc_codec::{ExecuteRequest, RaftRpc};
use nodedb_cluster::topology::NodeState;
use nodedb_physical::physical_plan::wire as plan_wire;
use super::super::super::types::sqlstate_error;
const PEER_BACKFILL_DEADLINE: Duration = Duration::from_secs(120);
pub(super) struct PeerBackfill<'a> {
pub tenant_id: TenantId,
pub collection: &'a str,
pub path: &'a str,
pub is_array: bool,
pub unique: bool,
pub case_insensitive: bool,
pub predicate: Option<&'a str>,
}
pub(super) async fn backfill_on_peers(
state: &SharedState,
args: PeerBackfill<'_>,
) -> Result<(), pgwire::error::PgWireError> {
let Some(transport) = state.cluster_transport.as_ref() else {
return Ok(());
};
let Some(topology_lock) = state.cluster_topology.as_ref() else {
return Ok(());
};
let peer_ids: Vec<u64> = {
let topology = topology_lock
.read()
.unwrap_or_else(|poisoned| poisoned.into_inner());
topology
.all_nodes()
.filter(|n| n.state == NodeState::Active && n.node_id != state.node_id)
.map(|n| n.node_id)
.collect()
};
if peer_ids.is_empty() {
return Ok(());
}
let plan = PhysicalPlan::Document(DocumentOp::BackfillIndex {
collection: args.collection.to_string(),
path: args.path.to_string(),
is_array: args.is_array,
unique: args.unique,
case_insensitive: args.case_insensitive,
predicate: args.predicate.map(str::to_string),
});
let plan_bytes = plan_wire::encode(&plan)
.map_err(|e| sqlstate_error("XX000", &format!("backfill plan encode: {e}")))?;
let deadline_ms = PEER_BACKFILL_DEADLINE.as_millis() as u64;
let trace_id = TraceId::generate();
let mut joins = Vec::with_capacity(peer_ids.len());
for node_id in peer_ids {
let transport = transport.clone();
let plan_bytes = plan_bytes.clone();
let req = RaftRpc::ExecuteRequest(ExecuteRequest {
plan_bytes,
tenant_id: args.tenant_id.as_u64(),
database_id: DatabaseId::DEFAULT.as_u64(),
deadline_remaining_ms: deadline_ms,
trace_id: trace_id.0,
descriptor_versions: Vec::new(),
});
joins.push(tokio::spawn(async move {
let outcome = transport.send_rpc(node_id, req).await;
(node_id, outcome)
}));
}
for join in joins {
let (node_id, outcome) = join
.await
.map_err(|e| sqlstate_error("XX000", &format!("peer backfill join: {e}")))?;
let resp = outcome.map_err(|e| {
sqlstate_error(
"XX000",
&format!("peer backfill transport to node {node_id}: {e}"),
)
})?;
let RaftRpc::ExecuteResponse(resp) = resp else {
return Err(sqlstate_error(
"XX000",
&format!("peer backfill on node {node_id}: unexpected RPC variant {resp:?}"),
));
};
if let Some(err) = resp.error {
let detail = format!("peer backfill on node {node_id}: {err:?}");
let code = if detail.to_lowercase().contains("unique") {
"23505"
} else {
"XX000"
};
return Err(sqlstate_error(code, &detail));
}
}
Ok(())
}