1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
// SPDX-License-Identifier: BUSL-1.1
//! Local execution of incoming `ExecuteRequest` RPCs.
//!
//! When a remote node sends an `ExecuteRequest` to this node (because this
//! node is the leader for the target vShard), the [`LocalPlanExecutor`]
//! validates descriptor versions, decodes the `PhysicalPlan`, dispatches
//! it through the local SPSC bridge, and returns an `ExecuteResponse`.
//!
//! Unlike the retired SQL-string forwarding path, this path skips planning
//! entirely — the plan is already encoded by the sender.
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use tracing::{Instrument, info_span};
use nodedb_cluster::forward::PlanExecutor;
use nodedb_cluster::rpc_codec::{ExecuteRequest, ExecuteResponse, TypedClusterError};
use crate::bridge::envelope::{Priority, Request};
use crate::bridge::physical_plan::wire as plan_wire;
use crate::control::state::SharedState;
use crate::types::DatabaseId;
use crate::types::ReadConsistency;
/// Numeric code for `TypedClusterError::Internal` when plan bytes fail to decode.
const PLAN_DECODE_FAILED: u32 = nodedb_cluster::rpc_codec::PLAN_DECODE_FAILED;
/// Executes pre-planned `PhysicalPlan` on the local Data Plane.
pub struct LocalPlanExecutor {
state: Arc<SharedState>,
}
impl LocalPlanExecutor {
pub fn new(state: Arc<SharedState>) -> Self {
Self { state }
}
}
impl PlanExecutor for LocalPlanExecutor {
async fn execute_plan(&self, req: ExecuteRequest) -> ExecuteResponse {
let trace_id = nodedb_types::TraceId(req.trace_id);
let tenant_id = req.tenant_id;
let exporter = Arc::clone(&self.state.trace_exporter);
let start = SystemTime::now();
let span = info_span!("executor.execute_plan", trace_id = %trace_id, tenant_id);
let resp = self.execute_plan_inner(req).instrument(span).await;
// Emit one OTLP executor span per leaseholder so the gateway's
// upstream span joins the N leaseholder spans into a single
// distributed trace via the shared `trace_id`.
exporter.emit(
"executor.execute_plan",
trace_id,
start,
SystemTime::now(),
tenant_id,
0,
resp.success,
);
resp
}
}
impl LocalPlanExecutor {
async fn execute_plan_inner(&self, req: ExecuteRequest) -> ExecuteResponse {
// ── 1. Deadline check ─────────────────────────────────────────────────
if req.deadline_remaining_ms == 0 {
return ExecuteResponse::err(TypedClusterError::DeadlineExceeded { elapsed_ms: 0 });
}
let deadline = Duration::from_millis(req.deadline_remaining_ms).min(Duration::from_secs(
self.state.tuning.network.default_deadline_secs,
));
// ── 2. Descriptor version validation ──────────────────────────────────
//
// For each (collection, version) pair the caller sent, look up the local
// descriptor version from SystemCatalog. If any version differs, the
// caller's plan was built against a stale schema — reject with a typed
// error so they re-plan against fresh leases.
let catalog_ref = self.state.credentials.catalog();
if let Some(catalog) = catalog_ref.as_ref() {
for entry in &req.descriptor_versions {
match catalog.get_collection(DatabaseId::DEFAULT, req.tenant_id, &entry.collection)
{
Ok(Some(stored)) => {
// Version 0 is the pre-B.1 sentinel; treat as 1 (same
// floor the drain gate uses).
let actual = if stored.descriptor_version == 0 {
1
} else {
stored.descriptor_version
};
if actual != entry.version {
return ExecuteResponse::err(TypedClusterError::DescriptorMismatch {
collection: entry.collection.clone(),
expected_version: entry.version,
actual_version: actual,
});
}
}
Ok(None) => {
// Collection not found locally — could be a new collection
// the follower saw but we haven't applied yet, or a race.
// Treat as DescriptorMismatch so the caller re-plans.
if entry.version != 0 {
return ExecuteResponse::err(TypedClusterError::DescriptorMismatch {
collection: entry.collection.clone(),
expected_version: entry.version,
actual_version: 0,
});
}
}
Err(e) => {
return ExecuteResponse::err(TypedClusterError::Internal {
code: PLAN_DECODE_FAILED,
message: format!("catalog lookup failed: {e}"),
});
}
}
}
}
// ── 3. Decode the PhysicalPlan ────────────────────────────────────────
let plan = match plan_wire::decode(&req.plan_bytes) {
Ok(p) => p,
Err(e) => {
return ExecuteResponse::err(TypedClusterError::Internal {
code: PLAN_DECODE_FAILED,
message: format!("plan decode failed: {e}"),
});
}
};
// ── 4. Dispatch through local SPSC bridge ─────────────────────────────
//
// Build a Request, register a oneshot tracker, dispatch, and await the response.
let request_id = self.state.next_request_id();
let tenant_id = crate::types::TenantId::new(req.tenant_id);
let database_id = crate::types::DatabaseId::from(req.database_id);
let request = Request {
request_id,
tenant_id,
database_id,
// Use the first vshard_id from the plan — the sender already routed
// this to the correct node. Use 0 as the default if the plan doesn't
// embed vshard info directly; the Data Plane ignores it for local exec.
vshard_id: crate::types::VShardId::new(0),
plan,
deadline: Instant::now() + deadline,
priority: Priority::Normal,
trace_id: nodedb_types::TraceId(req.trace_id),
consistency: ReadConsistency::Strong,
idempotency_key: None,
event_source: crate::event::EventSource::User,
user_roles: Vec::new(),
user_id: None,
statement_digest: None,
};
let mut rx = self.state.tracker.register(request_id);
let dispatch_result = match self.state.dispatcher.lock() {
Ok(mut d) => d.dispatch(request),
Err(poisoned) => poisoned.into_inner().dispatch(request),
};
if let Err(e) = dispatch_result {
return ExecuteResponse::err(TypedClusterError::Internal {
code: PLAN_DECODE_FAILED,
message: format!("dispatch failed: {e}"),
});
}
// ── 5. Collect response payloads ──────────────────────────────────────
match tokio::time::timeout(deadline, async { rx.recv().await.ok_or(()) }).await {
Ok(Ok(resp)) => {
if resp.status == crate::bridge::envelope::Status::Error {
let msg = resp
.error_code
.as_ref()
.map(|c| format!("{c:?}"))
.unwrap_or_else(|| "unknown error".into());
ExecuteResponse::err(TypedClusterError::Internal {
code: PLAN_DECODE_FAILED,
message: msg,
})
} else {
ExecuteResponse::ok(vec![resp.payload.to_vec()])
}
}
Ok(Err(_)) => ExecuteResponse::err(TypedClusterError::Internal {
code: PLAN_DECODE_FAILED,
message: "response channel closed".into(),
}),
Err(_) => ExecuteResponse::err(TypedClusterError::DeadlineExceeded {
elapsed_ms: deadline.as_millis() as u64,
}),
}
}
}