use std::sync::atomic::Ordering;
use std::time::{Duration, Instant};
use nodedb_types::{DatabaseId, TenantId};
use crate::bridge::envelope::{Priority, Request, Response};
use crate::bridge::physical_plan::PhysicalPlan;
use crate::control::state::SharedState;
use crate::types::{ReadConsistency, RequestId, TraceId, VShardId};
pub(super) async fn dispatch_local(
state: &SharedState,
tenant_id: TenantId,
database_id: DatabaseId,
collection_qualified: &str,
plan: PhysicalPlan,
) -> crate::Result<Response> {
let req_id = RequestId::new(state.request_id_counter.fetch_add(1, Ordering::Relaxed));
let deadline_secs = state.tuning.network.default_deadline_secs;
let deadline_dur = Duration::from_secs(deadline_secs);
let vshard_id = VShardId::from_collection_in_database(database_id, collection_qualified);
let req = Request {
request_id: req_id,
tenant_id,
vshard_id,
database_id,
plan,
deadline: Instant::now() + deadline_dur,
priority: Priority::Normal,
trace_id: TraceId::ZERO,
consistency: ReadConsistency::Strong,
idempotency_key: None,
event_source: crate::event::EventSource::User,
user_roles: Vec::new(),
user_id: None,
statement_digest: None,
};
let mut rx = state.tracker.register(req_id);
match state.dispatcher.lock() {
Ok(mut d) => d.dispatch(req)?,
Err(p) => p.into_inner().dispatch(req)?,
}
tokio::time::timeout(deadline_dur, rx.recv())
.await
.map_err(|_| crate::Error::DeadlineExceeded { request_id: req_id })?
.ok_or(crate::Error::Dispatch {
detail: "clone materializer: response channel closed".into(),
})
}