use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use nodedb_cluster::forward::RequestForwarder;
use nodedb_cluster::rpc_codec::{ForwardRequest, ForwardResponse};
use crate::bridge::envelope::{Priority, Request};
use crate::control::planner::context::QueryContext;
use crate::control::state::SharedState;
use crate::types::{ReadConsistency, RequestId, TenantId};
pub struct LocalForwarder {
state: Arc<SharedState>,
query_ctx: QueryContext,
next_request_id: AtomicU64,
}
impl LocalForwarder {
pub fn new(state: Arc<SharedState>, query_ctx: QueryContext) -> Self {
Self {
state,
query_ctx,
next_request_id: AtomicU64::new(1_000_000_000),
}
}
fn next_request_id(&self) -> RequestId {
RequestId::new(self.next_request_id.fetch_add(1, Ordering::Relaxed))
}
}
impl RequestForwarder for LocalForwarder {
async fn execute_forwarded(&self, req: ForwardRequest) -> ForwardResponse {
let tenant_id = TenantId::new(req.tenant_id);
let deadline = Duration::from_millis(req.deadline_remaining_ms).min(Duration::from_secs(
self.state.tuning.network.default_deadline_secs,
));
let tasks = match self.query_ctx.plan_sql(&req.sql, tenant_id).await {
Ok(t) => t,
Err(e) => {
return ForwardResponse {
success: false,
payloads: vec![],
error_message: format!("plan failed: {e}"),
};
}
};
if tasks.is_empty() {
return ForwardResponse {
success: true,
payloads: vec![],
error_message: String::new(),
};
}
let mut payloads = Vec::with_capacity(tasks.len());
for task in tasks {
let request_id = self.next_request_id();
let request = Request {
request_id,
tenant_id: task.tenant_id,
vshard_id: task.vshard_id,
plan: task.plan,
deadline: Instant::now() + deadline,
priority: Priority::Normal,
trace_id: req.trace_id,
consistency: ReadConsistency::Strong,
idempotency_key: None,
};
let rx = self.state.tracker.register_oneshot(request_id);
let dispatch_result = match self.state.dispatcher.lock() {
Ok(mut d) => d.dispatch(request),
Err(poisoned) => poisoned.into_inner().dispatch(request),
};
if let Err(e) = dispatch_result {
return ForwardResponse {
success: false,
payloads,
error_message: format!("dispatch failed: {e}"),
};
}
match tokio::time::timeout(deadline, rx).await {
Ok(Ok(resp)) => {
if resp.status == crate::bridge::envelope::Status::Error {
let err_msg = resp
.error_code
.as_ref()
.map(|c| format!("{c:?}"))
.unwrap_or_else(|| "unknown error".into());
return ForwardResponse {
success: false,
payloads,
error_message: err_msg,
};
}
payloads.push(resp.payload.to_vec());
}
Ok(Err(_)) => {
return ForwardResponse {
success: false,
payloads,
error_message: "response channel closed".into(),
};
}
Err(_) => {
return ForwardResponse {
success: false,
payloads,
error_message: format!("deadline exceeded ({}ms)", deadline.as_millis()),
};
}
}
}
ForwardResponse {
success: true,
payloads,
error_message: String::new(),
}
}
}