use std::sync::Arc;
use tracing::{debug, warn};
use crate::control::planner::context::QueryContext;
use crate::control::state::SharedState;
use crate::types::TenantId;
pub struct ClusterForwarder {
shared: Arc<SharedState>,
query_ctx: Arc<QueryContext>,
}
impl ClusterForwarder {
pub fn new(shared: Arc<SharedState>, query_ctx: Arc<QueryContext>) -> Self {
Self { shared, query_ctx }
}
}
impl nodedb_cluster::RequestForwarder for ClusterForwarder {
async fn execute_forwarded(
&self,
req: nodedb_cluster::rpc_codec::ForwardRequest,
) -> nodedb_cluster::rpc_codec::ForwardResponse {
let tenant_id = TenantId::new(req.tenant_id);
let sql = &req.sql;
debug!(
tenant_id = req.tenant_id,
sql = %sql,
trace_id = req.trace_id,
"executing forwarded query"
);
let tasks = match self.query_ctx.plan_sql(sql, tenant_id).await {
Ok(tasks) => tasks,
Err(e) => {
return nodedb_cluster::rpc_codec::ForwardResponse {
success: false,
payloads: vec![],
error_message: format!("SQL planning failed: {e}"),
};
}
};
if tasks.is_empty() {
return nodedb_cluster::rpc_codec::ForwardResponse {
success: true,
payloads: vec![],
error_message: String::new(),
};
}
let mut payloads = Vec::with_capacity(tasks.len());
for task in tasks {
if let Err(e) = crate::control::server::dispatch_utils::wal_append_if_write(
&self.shared.wal,
task.tenant_id,
task.vshard_id,
&task.plan,
) {
return nodedb_cluster::rpc_codec::ForwardResponse {
success: false,
payloads,
error_message: format!("WAL append failed: {e}"),
};
}
match crate::control::server::dispatch_utils::dispatch_to_data_plane(
&self.shared,
task.tenant_id,
task.vshard_id,
task.plan,
req.trace_id,
)
.await
{
Ok(response) => {
if response.status != crate::bridge::envelope::Status::Ok {
let detail = response
.error_code
.as_ref()
.map(|c| format!("{c:?}"))
.unwrap_or_else(|| "execution error".into());
return nodedb_cluster::rpc_codec::ForwardResponse {
success: false,
payloads,
error_message: detail,
};
}
payloads.push(response.payload.as_ref().to_vec());
}
Err(e) => {
warn!(error = %e, "forwarded query dispatch failed");
return nodedb_cluster::rpc_codec::ForwardResponse {
success: false,
payloads,
error_message: format!("dispatch failed: {e}"),
};
}
}
}
nodedb_cluster::rpc_codec::ForwardResponse {
success: true,
payloads,
error_message: String::new(),
}
}
}