use pgwire::api::results::Response;
use pgwire::error::{ErrorInfo, PgWireError, PgWireResult};
use crate::control::clone::resolver::{
CloneReadParams, ResolveOutcome, filter_tombstoned_rows, resolve_read,
};
use crate::control::planner::physical::PhysicalTask;
use crate::control::server::pgwire::handler::plan::{PlanKind, payload_to_response};
use crate::types::TenantId;
use super::kv_wrapping::maybe_wrap_kv_point_get;
use super::super::super::types::error_to_sqlstate;
use super::super::core::NodeDbPgHandler;
impl NodeDbPgHandler {
pub(super) async fn maybe_dispatch_clone_reads(
&self,
tasks: Vec<PhysicalTask>,
tenant_id: TenantId,
addr: &std::net::SocketAddr,
) -> PgWireResult<Option<Vec<Response>>> {
let (query_lsn, query_ms) =
if let Some(as_of_ms) = extract_system_as_of_ms(tasks.first().map(|t| &t.plan)) {
let lsn = self.state.ms_to_lsn(as_of_ms);
(lsn, Some(as_of_ms))
} else {
let lsn = self.state.wal.next_lsn();
let ms = self.state.ms_to_lsn_inverse(lsn);
(lsn, ms)
};
let params = CloneReadParams {
query_lsn,
query_ms,
};
let outcome = resolve_read(&self.state, tasks, tenant_id, ¶ms).map_err(|e| {
let (severity, code, message) = error_to_sqlstate(&e);
PgWireError::UserError(Box::new(ErrorInfo::new(
severity.to_owned(),
code.to_owned(),
message,
)))
})?;
match outcome {
None => Ok(None),
Some(ResolveOutcome::Passthrough(_tasks)) => {
Ok(None)
}
Some(ResolveOutcome::PreDatesClone(note)) => {
tracing::debug!(
message = note.message,
query_lsn = %note.query_lsn,
clone_created_at = %note.clone_created_at,
"clone read predates clone creation — returning empty result"
);
let empty: Vec<u8> =
nodedb_types::json_to_msgpack(&serde_json::json!([])).unwrap_or_default();
let shaped = payload_to_response(&empty, PlanKind::MultiRow);
if let Some(notice) = shaped.notice {
self.sessions.push_notice(addr, notice);
}
Ok(Some(vec![shaped.response]))
}
Some(ResolveOutcome::Augmented {
tasks,
source_start_idx,
origin: _,
target_collection_key,
note,
}) => {
if let Some(note) = note {
tracing::debug!(
message = note.message,
"clone read: T_lsn < clone_created_at (note attached)"
);
}
let (target_tasks, source_tasks) = tasks.split_at(source_start_idx);
let mut responses = Vec::with_capacity(target_tasks.len());
for task in target_tasks {
let resp = self.dispatch_task(task.clone(), None).await.map_err(|e| {
let (severity, code, message) = error_to_sqlstate(&e);
PgWireError::UserError(Box::new(ErrorInfo::new(
severity.to_owned(),
code.to_owned(),
message,
)))
})?;
responses.push(resp);
}
let tombstoned = {
let catalog_arc = self.state.credentials.catalog();
match catalog_arc.as_ref() {
Some(catalog) => catalog
.list_clone_tombstones(&target_collection_key)
.map_err(|e| {
let (severity, code, message) = error_to_sqlstate(&e);
PgWireError::UserError(Box::new(ErrorInfo::new(
severity.to_owned(),
code.to_owned(),
message,
)))
})?,
None => std::collections::HashSet::new(),
}
};
let kv_tombstoned = {
let catalog_arc = self.state.credentials.catalog();
match catalog_arc.as_ref() {
Some(catalog) => catalog
.list_kv_clone_tombstones(&target_collection_key)
.map_err(|e| {
let (severity, code, message) = error_to_sqlstate(&e);
PgWireError::UserError(Box::new(ErrorInfo::new(
severity.to_owned(),
code.to_owned(),
message,
)))
})?,
None => std::collections::HashSet::new(),
}
};
let target_count = target_tasks.len().max(1);
for (source_idx, source_task) in source_tasks.iter().enumerate() {
let response_idx = source_idx % target_count;
let source_resp = self
.dispatch_task(source_task.clone(), None)
.await
.map_err(|e| {
let (severity, code, message) = error_to_sqlstate(&e);
PgWireError::UserError(Box::new(ErrorInfo::new(
severity.to_owned(),
code.to_owned(),
message,
)))
})?;
let normalized_payload =
maybe_wrap_kv_point_get(&source_task.plan, source_resp.payload.as_ref());
let normalized_payload = wrap_single_map_as_array(normalized_payload);
let source_payload = match filter_tombstoned_rows(
&normalized_payload,
&tombstoned,
) {
Some(p) => p,
None => {
tracing::warn!(
payload_len = normalized_payload.len(),
"clone read: filter_tombstoned_rows received non-array msgpack payload after normalization — passing through unfiltered"
);
normalized_payload
}
};
let source_payload = if !kv_tombstoned.is_empty() {
match filter_kv_tombstoned_rows(&source_payload, &kv_tombstoned) {
Some(p) => p,
None => {
tracing::warn!(
payload_len = source_payload.len(),
"clone read: filter_kv_tombstoned_rows received non-array msgpack payload after normalization — passing through unfiltered"
);
source_payload
}
}
} else {
source_payload
};
if response_idx < responses.len() {
let target_payload = wrap_single_map_as_array(
responses[response_idx].payload.as_ref().to_vec(),
);
let merged = merge_msgpack_arrays(&target_payload, &source_payload)
.map_err(|e| {
let (severity, code, message) = error_to_sqlstate(&e);
PgWireError::UserError(Box::new(ErrorInfo::new(
severity.to_owned(),
code.to_owned(),
message,
)))
})?;
responses[response_idx] = crate::bridge::envelope::Response {
payload: merged.into(),
..responses[response_idx].clone()
};
} else {
responses.push(crate::bridge::envelope::Response {
payload: source_payload.into(),
..source_resp
});
}
}
let mut pg_responses = Vec::with_capacity(responses.len());
for resp in responses {
let shaped = payload_to_response(resp.payload.as_ref(), PlanKind::MultiRow);
if let Some(notice) = shaped.notice {
self.sessions.push_notice(addr, notice);
}
pg_responses.push(shaped.response);
}
Ok(Some(pg_responses))
}
}
}
}
fn wrap_single_map_as_array(payload: Vec<u8>) -> Vec<u8> {
use nodedb_query::msgpack_scan;
if payload.is_empty() {
return payload;
}
if msgpack_scan::array_header(&payload, 0).is_some() {
return payload;
}
let mut buf = Vec::with_capacity(1 + payload.len());
buf.push(0x91); buf.extend_from_slice(&payload);
buf
}
fn merge_msgpack_arrays(a: &[u8], b: &[u8]) -> crate::Result<Vec<u8>> {
use nodedb_query::msgpack_scan;
if a.is_empty() {
return Ok(b.to_vec());
}
if b.is_empty() {
return Ok(a.to_vec());
}
let (count_a, body_a_start) = msgpack_scan::array_header(a, 0).ok_or_else(|| {
crate::Error::Storage {
engine: "clone_merge".into(),
detail: format!(
"merge_msgpack_arrays: left input is not a msgpack array (len={}, first_byte=0x{:02x})",
a.len(),
a.first().copied().unwrap_or(0)
),
}
})?;
let (count_b, body_b_start) = msgpack_scan::array_header(b, 0).ok_or_else(|| {
crate::Error::Storage {
engine: "clone_merge".into(),
detail: format!(
"merge_msgpack_arrays: right input is not a msgpack array (len={}, first_byte=0x{:02x})",
b.len(),
b.first().copied().unwrap_or(0)
),
}
})?;
let total = count_a + count_b;
let body_a = &a[body_a_start..];
let body_b = &b[body_b_start..];
let mut buf = Vec::with_capacity(5 + body_a.len() + body_b.len());
if total <= 15 {
buf.push(0x90 | (total as u8));
} else if total <= 0xFFFF {
buf.push(0xdc);
buf.push((total >> 8) as u8);
buf.push(total as u8);
} else {
buf.push(0xdd);
buf.push((total >> 24) as u8);
buf.push((total >> 16) as u8);
buf.push((total >> 8) as u8);
buf.push(total as u8);
}
buf.extend_from_slice(body_a);
buf.extend_from_slice(body_b);
Ok(buf)
}
fn filter_kv_tombstoned_rows(
payload: &[u8],
tombstoned: &std::collections::HashSet<String>,
) -> Option<Vec<u8>> {
use nodedb_query::msgpack_scan;
if tombstoned.is_empty() || payload.is_empty() {
return Some(payload.to_vec());
}
let (count, body_start) = msgpack_scan::array_header(payload, 0)?;
let mut kept_ranges: Vec<(usize, usize)> = Vec::with_capacity(count);
let mut pos = body_start;
for _ in 0..count {
let row_start = pos;
pos = msgpack_scan::skip_value(payload, pos)?;
let row_bytes = &payload[row_start..pos];
let extracted_key = msgpack_scan::extract_field(row_bytes, 0, "key")
.and_then(|(start, _)| msgpack_scan::read_str(row_bytes, start));
let is_tombstoned = match extracted_key {
Some(k) => tombstoned.contains(k),
None => {
tracing::warn!(
row_len = row_bytes.len(),
"clone read: KV row in source response has no `key` field; including unfiltered (protocol contract violation upstream)"
);
false
}
};
if !is_tombstoned {
kept_ranges.push((row_start, pos));
}
}
if kept_ranges.len() == count {
return Some(payload.to_vec());
}
let kept = kept_ranges.len();
let mut buf = Vec::with_capacity(payload.len());
if kept <= 15 {
buf.push(0x90 | (kept as u8));
} else if kept <= 0xFFFF {
buf.push(0xdc);
buf.push((kept >> 8) as u8);
buf.push(kept as u8);
} else {
buf.push(0xdd);
buf.push((kept >> 24) as u8);
buf.push((kept >> 16) as u8);
buf.push((kept >> 8) as u8);
buf.push(kept as u8);
}
for (start, end) in kept_ranges {
buf.extend_from_slice(&payload[start..end]);
}
Some(buf)
}
fn extract_system_as_of_ms(
plan: Option<&crate::bridge::physical_plan::PhysicalPlan>,
) -> Option<i64> {
use crate::bridge::physical_plan::PhysicalPlan;
match plan? {
PhysicalPlan::Document(op) => extract_doc_as_of(op),
PhysicalPlan::Columnar(op) => extract_columnar_as_of(op),
PhysicalPlan::Timeseries(op) => extract_timeseries_as_of(op),
PhysicalPlan::Vector(_)
| PhysicalPlan::Graph(_)
| PhysicalPlan::Kv(_)
| PhysicalPlan::Text(_)
| PhysicalPlan::Spatial(_)
| PhysicalPlan::Crdt(_)
| PhysicalPlan::Query(_)
| PhysicalPlan::Meta(_)
| PhysicalPlan::Array(_)
| PhysicalPlan::ClusterArray(_) => None,
}
}
fn extract_doc_as_of(op: &crate::bridge::physical_plan::DocumentOp) -> Option<i64> {
use crate::bridge::physical_plan::DocumentOp;
match op {
DocumentOp::Scan {
system_as_of_ms, ..
} => *system_as_of_ms,
_ => None,
}
}
fn extract_columnar_as_of(op: &crate::bridge::physical_plan::ColumnarOp) -> Option<i64> {
use crate::bridge::physical_plan::ColumnarOp;
match op {
ColumnarOp::Scan {
system_as_of_ms, ..
} => *system_as_of_ms,
_ => None,
}
}
fn extract_timeseries_as_of(op: &crate::bridge::physical_plan::TimeseriesOp) -> Option<i64> {
use crate::bridge::physical_plan::TimeseriesOp;
match op {
TimeseriesOp::Scan {
system_as_of_ms, ..
} => *system_as_of_ms,
_ => None,
}
}