use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use futures::stream::Stream;
use futures::StreamExt;
use thiserror::Error;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use super::error::MeshError;
use super::executor::{MeshQueryExecutor, QueryHandle, ResultStream, RunningQuery};
use super::planner::{ExecutionPlan, OperatorPlan};
use super::protocol::{MeshDbRequest, MeshDbResponse};
use super::query::ResultRow;
pub type ResponseStream = Pin<Box<dyn Stream<Item = MeshDbResponse> + Send>>;
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum TransportError {
#[error("no route to node {0:#x}")]
NoRoute(u64),
#[error("transport error: {0}")]
Other(String),
}
#[async_trait]
pub trait MeshDbTransport: Send + Sync {
async fn send(
&self,
node: u64,
request: MeshDbRequest,
) -> Result<ResponseStream, TransportError>;
}
pub struct FederatedMeshQueryExecutor<T: MeshDbTransport> {
transport: Arc<T>,
cache: Option<Arc<dyn super::cache::ResultCache>>,
capability_version: Option<Arc<dyn Fn() -> u64 + Send + Sync>>,
}
static FEDERATED_CALL_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
impl<T: MeshDbTransport> FederatedMeshQueryExecutor<T> {
pub fn new(transport: Arc<T>) -> Self {
Self {
transport,
cache: None,
capability_version: None,
}
}
pub fn with_cache(
transport: Arc<T>,
cache: Arc<dyn super::cache::ResultCache>,
capability_version: Arc<dyn Fn() -> u64 + Send + Sync>,
) -> Self {
Self {
transport,
cache: Some(cache),
capability_version: Some(capability_version),
}
}
fn allocate_id(&self) -> u64 {
FEDERATED_CALL_ID_COUNTER.fetch_add(1, Ordering::Relaxed)
}
}
#[async_trait]
impl<T: MeshDbTransport + 'static> MeshQueryExecutor for FederatedMeshQueryExecutor<T> {
async fn execute_with(
&self,
plan: ExecutionPlan,
options: super::executor::ExecuteOptions,
) -> Result<RunningQuery, MeshError> {
let handle = QueryHandle::new(self.allocate_id());
if let (Some(cache), Some(version_fn), false) = (
self.cache.as_ref(),
self.capability_version.as_ref(),
options.bypass_cache,
) {
let version = version_fn();
if let Some(key) = super::cache::CacheKey::for_plan(&plan, version) {
if let Some(cached) = cache.get(&key) {
let rows = stream_results_cancellable(
cached.rows.into_iter().map(Ok).collect(),
handle.clone(),
);
return Ok(RunningQuery { handle, rows });
}
let drained = self
.execute_uncached_with_handle(plan.clone(), handle.clone())
.await?;
let collected = drain_rows(drained.rows).await?;
if handle.is_cancelled() {
return Err(MeshError::QueryCancelled);
}
cache.insert(
key,
super::cache::CachedResult::new(
collected.clone(),
std::time::Instant::now(),
options.cache_policy,
),
);
let rows = stream_results_cancellable(
collected.into_iter().map(Ok).collect(),
handle.clone(),
);
return Ok(RunningQuery { handle, rows });
}
}
self.execute_uncached_with_handle(plan, handle).await
}
}
impl<T: MeshDbTransport + 'static> FederatedMeshQueryExecutor<T> {
async fn execute_uncached_with_handle(
&self,
plan: ExecutionPlan,
handle: QueryHandle,
) -> Result<RunningQuery, MeshError> {
if handle.is_cancelled() {
return Err(MeshError::QueryCancelled);
}
match &plan.root.operator {
OperatorPlan::AtRead { .. }
| OperatorPlan::BetweenRead { .. }
| OperatorPlan::LatestRead { .. } => {}
OperatorPlan::LineageEmit { entries, .. } => {
use super::query::SeqNum;
let rows_vec: Vec<Result<ResultRow, MeshError>> = entries
.iter()
.map(|entry| {
Ok(ResultRow {
origin: entry.origin,
seq: entry.tip_seq.unwrap_or(SeqNum(0)),
payload: Vec::new(),
})
})
.collect();
let rows = stream_results_cancellable(rows_vec, handle.clone());
return Ok(RunningQuery { handle, rows });
}
OperatorPlan::HashJoin { .. } => {
return self.execute_hash_join_federated(plan, handle).await;
}
OperatorPlan::AggregateCount { .. } => {
return self.execute_aggregate_count_federated(plan, handle).await;
}
OperatorPlan::AggregateNumeric { .. } => {
return self.execute_aggregate_numeric_federated(plan, handle).await;
}
OperatorPlan::AggregateReduction { .. } | OperatorPlan::AggregateDistinct { .. } => {
return self.execute_aggregate_e4_federated(plan, handle).await;
}
OperatorPlan::Window { .. } => {
return self.execute_window_federated(plan, handle).await;
}
OperatorPlan::Filter { .. } => {
return self.execute_filter_federated(plan, handle).await;
}
OperatorPlan::NotYetImplemented { detail, .. } => {
return Err(MeshError::PlannerError {
detail: format!("operator not yet implemented: {detail}"),
});
}
}
let targets = plan.root.target_nodes.clone();
if targets.is_empty() {
let rows: ResultStream = Box::pin(futures::stream::empty());
return Ok(RunningQuery { handle, rows });
}
let call_id = handle.id();
let request = MeshDbRequest::Execute {
call_id,
plan: plan.clone(),
};
let mut response_stream = None;
let mut last_attempted: u64 = targets[0];
let mut last_err: Option<TransportError> = None;
for &target in &targets {
if handle.is_cancelled() {
return Err(MeshError::QueryCancelled);
}
last_attempted = target;
match self.transport.send(target, request.clone()).await {
Ok(s) => {
response_stream = Some(s);
break;
}
Err(err @ TransportError::NoRoute(_)) => {
last_err = Some(err);
continue;
}
Err(other) => {
last_err = Some(other);
continue;
}
}
}
let response_stream = match response_stream {
Some(s) => s,
None => {
let detail = last_err
.map(|e| format!("all targets failed; last error: {e}"))
.unwrap_or_else(|| "no targets reachable".to_string());
return Err(MeshError::ExecutorError {
node: last_attempted,
detail,
});
}
};
let rows = translate_responses(response_stream, handle.clone());
Ok(RunningQuery { handle, rows })
}
}
impl<T: MeshDbTransport + 'static> FederatedMeshQueryExecutor<T> {
async fn execute_hash_join_federated(
&self,
plan: ExecutionPlan,
handle: QueryHandle,
) -> Result<RunningQuery, MeshError> {
use super::planner::CostEstimate;
use super::query::{JoinKind, JoinedRowPayload, SeqNum};
let OperatorPlan::HashJoin {
left,
right,
key_mode,
kind,
strategy,
..
} = plan.root.operator
else {
unreachable!("execute_hash_join_federated dispatched on non-HashJoin");
};
let (left_running, right_running) = tokio::try_join!(
Box::pin(self.execute_uncached_with_handle(
ExecutionPlan {
root: *left,
total_cost: CostEstimate::default(),
},
handle.clone(),
)),
Box::pin(self.execute_uncached_with_handle(
ExecutionPlan {
root: *right,
total_cost: CostEstimate::default(),
},
handle.clone(),
)),
)?;
if handle.is_cancelled() {
return Err(MeshError::QueryCancelled);
}
let left_rows = drain_rows(left_running.rows).await?;
let right_rows = drain_rows(right_running.rows).await?;
if handle.is_cancelled() {
return Err(MeshError::QueryCancelled);
}
let pairs = match (strategy, kind) {
(super::planner::JoinStrategy::HashBroadcast, JoinKind::Inner) => {
federated_hash_join(left_rows, right_rows, &key_mode, false, false)?
}
(super::planner::JoinStrategy::HashBroadcast, JoinKind::LeftOuter) => {
federated_hash_join(left_rows, right_rows, &key_mode, true, false)?
}
(super::planner::JoinStrategy::HashBroadcast, JoinKind::RightOuter) => {
federated_hash_join(right_rows, left_rows, &key_mode, true, true)?
}
(super::planner::JoinStrategy::HashBroadcast, JoinKind::FullOuter) => {
federated_full_outer(left_rows, right_rows, &key_mode)?
}
(super::planner::JoinStrategy::SortMerge, k) => {
federated_sort_merge(left_rows, right_rows, &key_mode, k)?
}
};
let mut out: Vec<Result<ResultRow, MeshError>> = Vec::new();
for (l, r) in pairs {
let payload =
postcard::to_allocvec(&JoinedRowPayload { left: l, right: r }).map_err(|e| {
MeshError::ExecutorError {
node: 0,
detail: format!("encode JoinedRowPayload: {e}"),
}
})?;
out.push(Ok(ResultRow {
origin: 0,
seq: SeqNum(0),
payload,
}));
}
let rows = stream_results_cancellable(out, handle.clone());
Ok(RunningQuery { handle, rows })
}
}
type JoinedPair = (Option<ResultRow>, Option<ResultRow>);
impl<T: MeshDbTransport + 'static> FederatedMeshQueryExecutor<T> {
async fn execute_filter_federated(
&self,
plan: ExecutionPlan,
handle: QueryHandle,
) -> Result<RunningQuery, MeshError> {
use super::planner::CostEstimate;
use crate::adapter::net::behavior::predicate::EvalContext;
let OperatorPlan::Filter { input, predicate } = plan.root.operator else {
unreachable!("execute_filter_federated dispatched on non-Filter");
};
let inner = Box::pin(self.execute_uncached_with_handle(
ExecutionPlan {
root: *input,
total_cost: CostEstimate::default(),
},
handle.clone(),
))
.await?;
let rows = drain_rows(inner.rows).await?;
if handle.is_cancelled() {
return Err(MeshError::QueryCancelled);
}
let pred = predicate
.into_predicate()
.map_err(|e| MeshError::PlannerError {
detail: format!("Filter predicate rebuild failed: {e:?}"),
})?;
let mut out: Vec<Result<ResultRow, MeshError>> = Vec::with_capacity(rows.len());
for row in rows {
let (tags, metadata) = super::row::synthetic_row_view(&row);
let ctx = EvalContext::new(&tags, &metadata);
if pred.evaluate(&ctx) {
out.push(Ok(row));
}
}
let rows = stream_results_cancellable(out, handle.clone());
Ok(RunningQuery { handle, rows })
}
async fn execute_window_federated(
&self,
plan: ExecutionPlan,
handle: QueryHandle,
) -> Result<RunningQuery, MeshError> {
use super::executor::execute_window;
use super::planner::CostEstimate;
let OperatorPlan::Window { input, spec } = plan.root.operator else {
unreachable!("execute_window_federated dispatched on non-Window");
};
let inner = Box::pin(self.execute_uncached_with_handle(
ExecutionPlan {
root: *input,
total_cost: CostEstimate::default(),
},
handle.clone(),
))
.await?;
let rows = drain_rows(inner.rows).await?;
if handle.is_cancelled() {
return Err(MeshError::QueryCancelled);
}
let output_rows = execute_window(rows, &spec)?;
let out: Vec<Result<ResultRow, MeshError>> = output_rows.into_iter().map(Ok).collect();
let rows = stream_results_cancellable(out, handle.clone());
Ok(RunningQuery { handle, rows })
}
async fn execute_aggregate_e4_federated(
&self,
plan: ExecutionPlan,
handle: QueryHandle,
) -> Result<RunningQuery, MeshError> {
use super::executor::{execute_aggregate_distinct, execute_aggregate_reduction};
use super::planner::CostEstimate;
let output_rows = match plan.root.operator {
OperatorPlan::AggregateReduction {
input,
group_by,
field_path,
kind,
} => {
let inner = Box::pin(self.execute_uncached_with_handle(
ExecutionPlan {
root: *input,
total_cost: CostEstimate::default(),
},
handle.clone(),
))
.await?;
let rows = drain_rows(inner.rows).await?;
if handle.is_cancelled() {
return Err(MeshError::QueryCancelled);
}
execute_aggregate_reduction(&rows, group_by.as_ref(), &field_path, kind)?
}
OperatorPlan::AggregateDistinct {
input,
group_by,
field_path,
} => {
let inner = Box::pin(self.execute_uncached_with_handle(
ExecutionPlan {
root: *input,
total_cost: CostEstimate::default(),
},
handle.clone(),
))
.await?;
let rows = drain_rows(inner.rows).await?;
if handle.is_cancelled() {
return Err(MeshError::QueryCancelled);
}
execute_aggregate_distinct(&rows, group_by.as_ref(), &field_path)?
}
_ => unreachable!("execute_aggregate_e4_federated dispatched on wrong operator"),
};
let out: Vec<Result<ResultRow, MeshError>> = output_rows.into_iter().map(Ok).collect();
let rows = stream_results_cancellable(out, handle.clone());
Ok(RunningQuery { handle, rows })
}
async fn execute_aggregate_numeric_federated(
&self,
plan: ExecutionPlan,
handle: QueryHandle,
) -> Result<RunningQuery, MeshError> {
use super::planner::CostEstimate;
use super::query::{
AggregateRowPayload, AggregateValue, GroupKey, NumericAggregateKind, SeqNum,
};
use std::collections::BTreeMap;
let OperatorPlan::AggregateNumeric {
input,
group_by,
field_path,
kind,
} = plan.root.operator
else {
unreachable!("execute_aggregate_numeric_federated dispatched on non-AggregateNumeric");
};
let inner = Box::pin(self.execute_uncached_with_handle(
ExecutionPlan {
root: *input,
total_cost: CostEstimate::default(),
},
handle.clone(),
))
.await?;
let rows = drain_rows(inner.rows).await?;
if handle.is_cancelled() {
return Err(MeshError::QueryCancelled);
}
let mut acc: BTreeMap<Vec<u8>, (Option<GroupKey>, f64, u64)> = BTreeMap::new();
for row in &rows {
let Some(value) = super::row::extract_numeric(row, &field_path) else {
continue;
};
let (key_bytes, group) = match &group_by {
None => (Vec::new(), None),
Some(mode) => {
let Some(bytes) = try_encode_join_key_federated(row, mode) else {
continue;
};
let group = match mode {
super::planner::JoinKeyMode::Origin => GroupKey::Origin(row.origin),
super::planner::JoinKeyMode::Seq => GroupKey::Seq(row.seq),
super::planner::JoinKeyMode::OriginSeq => GroupKey::OriginSeq {
origin: row.origin,
seq: row.seq,
},
super::planner::JoinKeyMode::Field(_) => unreachable!(
"JoinKeyMode::Field reached federated_aggregate_numeric; payload-keyed group_by is not supported",
),
};
(bytes, Some(group))
}
};
let entry = acc.entry(key_bytes).or_insert((group, 0.0, 0));
entry.1 += value;
entry.2 = entry.2.saturating_add(1);
}
let mut out: Vec<Result<ResultRow, MeshError>> = Vec::new();
let mk_value = |sum: f64, count: u64| match kind {
NumericAggregateKind::Sum => AggregateValue::Sum(sum),
NumericAggregateKind::Avg => {
if count == 0 {
AggregateValue::Avg(None)
} else {
AggregateValue::Avg(Some(sum / count as f64))
}
}
};
if group_by.is_none() {
let (sum, count) = acc
.get(&Vec::<u8>::new())
.map(|(_, s, c)| (*s, *c))
.unwrap_or((0.0, 0));
let payload = postcard::to_allocvec(&AggregateRowPayload {
group: None,
value: mk_value(sum, count),
})
.map_err(|e| MeshError::ExecutorError {
node: 0,
detail: format!("encode AggregateRowPayload: {e}"),
})?;
out.push(Ok(ResultRow {
origin: 0,
seq: SeqNum(0),
payload,
}));
} else {
for (_, (group, sum, count)) in acc {
let payload = postcard::to_allocvec(&AggregateRowPayload {
group,
value: mk_value(sum, count),
})
.map_err(|e| MeshError::ExecutorError {
node: 0,
detail: format!("encode AggregateRowPayload: {e}"),
})?;
out.push(Ok(ResultRow {
origin: 0,
seq: SeqNum(0),
payload,
}));
}
}
let rows_out = stream_results_cancellable(out, handle.clone());
Ok(RunningQuery {
handle,
rows: rows_out,
})
}
async fn execute_aggregate_count_federated(
&self,
plan: ExecutionPlan,
handle: QueryHandle,
) -> Result<RunningQuery, MeshError> {
use super::planner::CostEstimate;
use super::query::{AggregateRowPayload, AggregateValue, GroupKey, SeqNum};
use std::collections::BTreeMap;
let OperatorPlan::AggregateCount { input, group_by } = plan.root.operator else {
unreachable!("execute_aggregate_count_federated dispatched on non-AggregateCount");
};
let inner = Box::pin(self.execute_uncached_with_handle(
ExecutionPlan {
root: *input,
total_cost: CostEstimate::default(),
},
handle.clone(),
))
.await?;
let rows = drain_rows(inner.rows).await?;
if handle.is_cancelled() {
return Err(MeshError::QueryCancelled);
}
let mut out: Vec<Result<ResultRow, MeshError>> = Vec::new();
match group_by {
None => {
let payload = postcard::to_allocvec(&AggregateRowPayload {
group: None,
value: AggregateValue::Count(rows.len() as u64),
})
.map_err(|e| MeshError::ExecutorError {
node: 0,
detail: format!("encode AggregateRowPayload: {e}"),
})?;
out.push(Ok(ResultRow {
origin: 0,
seq: SeqNum(0),
payload,
}));
}
Some(mode) => {
let mut counts: BTreeMap<Vec<u8>, (GroupKey, u64)> = BTreeMap::new();
for row in &rows {
let Some(key_bytes) = try_encode_join_key_federated(row, &mode) else {
continue;
};
let key = match &mode {
super::planner::JoinKeyMode::Origin => GroupKey::Origin(row.origin),
super::planner::JoinKeyMode::Seq => GroupKey::Seq(row.seq),
super::planner::JoinKeyMode::OriginSeq => GroupKey::OriginSeq {
origin: row.origin,
seq: row.seq,
},
super::planner::JoinKeyMode::Field(_) => unreachable!(
"JoinKeyMode::Field reached federated_aggregate_count; payload-keyed group_by is not supported",
),
};
let entry = counts.entry(key_bytes).or_insert((key, 0));
entry.1 = entry.1.saturating_add(1);
}
for (_, (group, count)) in counts {
let payload = postcard::to_allocvec(&AggregateRowPayload {
group: Some(group),
value: AggregateValue::Count(count),
})
.map_err(|e| MeshError::ExecutorError {
node: 0,
detail: format!("encode AggregateRowPayload: {e}"),
})?;
out.push(Ok(ResultRow {
origin: 0,
seq: SeqNum(0),
payload,
}));
}
}
}
let rows_out = stream_results_cancellable(out, handle.clone());
Ok(RunningQuery {
handle,
rows: rows_out,
})
}
}
fn federated_hash_join(
build_rows: Vec<ResultRow>,
probe_rows: Vec<ResultRow>,
key_mode: &super::planner::JoinKeyMode,
emit_unmatched_build: bool,
swap: bool,
) -> Result<Vec<JoinedPair>, MeshError> {
let mut build =
super::executor::build_hash_join_table(build_rows, key_mode, "broadcast-hash-federated")?;
let mut out = Vec::new();
for p in probe_rows {
let Some(key) = try_encode_join_key_federated(&p, key_mode) else {
continue;
};
if let Some(entries) = build.get_mut(&key) {
for (b, matched) in entries.iter_mut() {
*matched = true;
if swap {
out.push((Some(p.clone()), Some(b.clone())));
} else {
out.push((Some(b.clone()), Some(p.clone())));
}
}
}
}
if emit_unmatched_build {
for entries in build.into_values() {
for (b, matched) in entries {
if !matched {
if swap {
out.push((None, Some(b)));
} else {
out.push((Some(b), None));
}
}
}
}
}
Ok(out)
}
fn federated_full_outer(
left_rows: Vec<ResultRow>,
right_rows: Vec<ResultRow>,
key_mode: &super::planner::JoinKeyMode,
) -> Result<Vec<JoinedPair>, MeshError> {
let mut right_map =
super::executor::build_hash_join_table(right_rows, key_mode, "broadcast-hash-federated")?;
let mut out = Vec::new();
for l in left_rows {
let Some(key) = try_encode_join_key_federated(&l, key_mode) else {
out.push((Some(l), None));
continue;
};
match right_map.get_mut(&key) {
Some(entries) => {
for (r, matched) in entries.iter_mut() {
*matched = true;
out.push((Some(l.clone()), Some(r.clone())));
}
}
None => out.push((Some(l), None)),
}
}
for entries in right_map.into_values() {
for (r, matched) in entries {
if !matched {
out.push((None, Some(r)));
}
}
}
Ok(out)
}
fn federated_sort_merge(
left_rows: Vec<ResultRow>,
right_rows: Vec<ResultRow>,
key_mode: &super::planner::JoinKeyMode,
kind: super::query::JoinKind,
) -> Result<Vec<JoinedPair>, MeshError> {
use super::query::JoinKind;
let mut left: Vec<(Vec<u8>, ResultRow)> = left_rows
.into_iter()
.filter_map(|r| try_encode_join_key_federated(&r, key_mode).map(|k| (k, r)))
.collect();
let mut right: Vec<(Vec<u8>, ResultRow)> = right_rows
.into_iter()
.filter_map(|r| try_encode_join_key_federated(&r, key_mode).map(|k| (k, r)))
.collect();
left.sort_by(|a, b| a.0.cmp(&b.0));
right.sort_by(|a, b| a.0.cmp(&b.0));
let emit_l = matches!(kind, JoinKind::LeftOuter | JoinKind::FullOuter);
let emit_r = matches!(kind, JoinKind::RightOuter | JoinKind::FullOuter);
let mut out = Vec::new();
let (mut li, mut ri) = (0usize, 0usize);
while li < left.len() && ri < right.len() {
match left[li].0.cmp(&right[ri].0) {
std::cmp::Ordering::Less => {
if emit_l {
out.push((Some(left[li].1.clone()), None));
}
li += 1;
}
std::cmp::Ordering::Greater => {
if emit_r {
out.push((None, Some(right[ri].1.clone())));
}
ri += 1;
}
std::cmp::Ordering::Equal => {
let key = left[li].0.clone();
let mut lj = li;
while lj < left.len() && left[lj].0 == key {
lj += 1;
}
let mut rj = ri;
while rj < right.len() && right[rj].0 == key {
rj += 1;
}
for l in &left[li..lj] {
for r in &right[ri..rj] {
out.push((Some(l.1.clone()), Some(r.1.clone())));
}
}
li = lj;
ri = rj;
}
}
}
if emit_l {
for (_, l) in &left[li..] {
out.push((Some(l.clone()), None));
}
}
if emit_r {
for (_, r) in &right[ri..] {
out.push((None, Some(r.clone())));
}
}
Ok(out)
}
const AGGREGATE_MAX_BYTES: usize = 256 * 1024 * 1024;
fn approximate_row_bytes(row: &ResultRow) -> usize {
row.payload.len().saturating_add(64)
}
async fn drain_rows(mut s: ResultStream) -> Result<Vec<ResultRow>, MeshError> {
const DRAIN_INITIAL_CAPACITY: usize = 128;
let mut out = Vec::with_capacity(DRAIN_INITIAL_CAPACITY);
let mut bytes: usize = 0;
while let Some(item) = s.next().await {
let row = item?;
bytes = bytes.saturating_add(approximate_row_bytes(&row));
if bytes > AGGREGATE_MAX_BYTES {
return Err(MeshError::QueryBudgetExceeded {
metric: super::error::BudgetMetric::MaxBytesScanned,
used: bytes as u64,
limit: AGGREGATE_MAX_BYTES as u64,
});
}
out.push(row);
}
Ok(out)
}
fn stream_results_cancellable(
rows: Vec<Result<ResultRow, MeshError>>,
handle: QueryHandle,
) -> ResultStream {
let stream = futures::stream::iter(rows).map(move |item| {
if handle.is_cancelled() {
Err(MeshError::QueryCancelled)
} else {
item
}
});
Box::pin(stream)
}
fn try_encode_join_key_federated(
row: &ResultRow,
mode: &super::planner::JoinKeyMode,
) -> Option<Vec<u8>> {
use super::planner::JoinKeyMode;
match mode {
JoinKeyMode::Origin => Some(row.origin.to_le_bytes().to_vec()),
JoinKeyMode::Seq => Some(row.seq.0.to_le_bytes().to_vec()),
JoinKeyMode::OriginSeq => {
let mut v = Vec::with_capacity(16);
v.extend_from_slice(&row.origin.to_le_bytes());
v.extend_from_slice(&row.seq.0.to_le_bytes());
Some(v)
}
JoinKeyMode::Field(path) => match path.as_str() {
"origin" => Some(row.origin.to_le_bytes().to_vec()),
"seq" => Some(row.seq.0.to_le_bytes().to_vec()),
"origin,seq" => {
let mut v = Vec::with_capacity(16);
v.extend_from_slice(&row.origin.to_le_bytes());
v.extend_from_slice(&row.seq.0.to_le_bytes());
Some(v)
}
_ => super::row::extract_string_projection(row, path).map(String::into_bytes),
},
}
}
fn translate_responses(mut response_stream: ResponseStream, handle: QueryHandle) -> ResultStream {
let (tx, rx) = mpsc::channel::<Result<ResultRow, MeshError>>(64);
tokio::spawn(async move {
while let Some(response) = response_stream.next().await {
if handle.is_cancelled() {
let _ = tx.send(Err(MeshError::QueryCancelled)).await;
return;
}
match response {
MeshDbResponse::Batch { batch, .. } => {
let is_final = batch.r#final;
for row in batch.rows {
if tx.send(Ok(row)).await.is_err() {
return;
}
}
if is_final {
return;
}
}
MeshDbResponse::End { .. } => return,
MeshDbResponse::Error { error, .. } => {
let _ = tx.send(Err(error)).await;
return;
}
}
}
let _ = tx
.send(Err(MeshError::ExecutorError {
node: 0,
detail: "transport stream ended before terminal frame".to_string(),
}))
.await;
});
Box::pin(ReceiverStream::new(rx))
}
pub struct LoopbackTransport {
nodes: parking_lot::RwLock<std::collections::HashMap<u64, LoopbackNode>>,
}
struct LoopbackNode {
executor: Arc<dyn MeshQueryExecutor>,
online: bool,
}
impl Default for LoopbackTransport {
fn default() -> Self {
Self::new()
}
}
impl LoopbackTransport {
pub fn new() -> Self {
Self {
nodes: parking_lot::RwLock::new(std::collections::HashMap::new()),
}
}
pub fn register(&self, node_id: u64, executor: Arc<dyn MeshQueryExecutor>) {
self.nodes.write().insert(
node_id,
LoopbackNode {
executor,
online: true,
},
);
}
pub fn set_offline(&self, node_id: u64, offline: bool) {
if let Some(n) = self.nodes.write().get_mut(&node_id) {
n.online = !offline;
}
}
}
#[async_trait]
impl MeshDbTransport for LoopbackTransport {
async fn send(
&self,
node: u64,
request: MeshDbRequest,
) -> Result<ResponseStream, TransportError> {
let exec = {
let guard = self.nodes.read();
let entry = guard.get(&node).ok_or(TransportError::NoRoute(node))?;
if !entry.online {
return Err(TransportError::NoRoute(node));
}
entry.executor.clone()
};
match request {
MeshDbRequest::Execute { call_id, plan } => {
let running = exec
.execute(plan)
.await
.map_err(|e| TransportError::Other(format!("remote execute failed: {e}")))?;
let stream = row_stream_to_responses(running.rows, call_id);
Ok(stream)
}
MeshDbRequest::Resume { .. } => Err(TransportError::Other(
"Resume not yet implemented in LoopbackTransport (Phase B-4+)".to_string(),
)),
MeshDbRequest::Cancel { .. } => {
let empty: ResponseStream = Box::pin(futures::stream::empty());
Ok(empty)
}
}
}
}
fn row_stream_to_responses(mut rows: ResultStream, call_id: u64) -> ResponseStream {
use super::protocol::{MeshDbResponse, ResultBatch};
let (tx, rx) = mpsc::channel::<MeshDbResponse>(64);
tokio::spawn(async move {
while let Some(item) = rows.next().await {
match item {
Ok(row) => {
let resp = MeshDbResponse::Batch {
call_id,
batch: ResultBatch::chunk(vec![row]),
};
if tx.send(resp).await.is_err() {
return;
}
}
Err(error) => {
let _ = tx.send(MeshDbResponse::Error { call_id, error }).await;
return;
}
}
}
let _ = tx.send(MeshDbResponse::End { call_id }).await;
});
Box::pin(ReceiverStream::new(rx))
}
#[cfg(test)]
mod tests {
#![allow(
clippy::disallowed_methods,
reason = "test code legitimately uses std::sync::{Mutex,RwLock} for SUT setup; tests have no real poison concern"
)]
use std::collections::BTreeMap;
use std::sync::Mutex;
use super::super::executor::{ChainReader, LocalMeshQueryExecutor};
use super::super::planner::{CostEstimate, OperatorNode};
use super::super::query::SeqNum;
use super::*;
#[derive(Default)]
struct InMemoryChainReader {
chains: Mutex<BTreeMap<u64, BTreeMap<SeqNum, Vec<u8>>>>,
}
impl InMemoryChainReader {
fn append(&self, origin: u64, seq: SeqNum, payload: Vec<u8>) {
self.chains
.lock()
.unwrap()
.entry(origin)
.or_default()
.insert(seq, payload);
}
}
impl ChainReader for InMemoryChainReader {
fn read_one(&self, origin: u64, seq: SeqNum) -> Option<Vec<u8>> {
self.chains.lock().unwrap().get(&origin)?.get(&seq).cloned()
}
fn read_range(&self, origin: u64, start: SeqNum, end: SeqNum) -> Vec<(SeqNum, Vec<u8>)> {
self.chains
.lock()
.unwrap()
.get(&origin)
.map(|chain| {
chain
.range(start..end)
.map(|(s, p)| (*s, p.clone()))
.collect()
})
.unwrap_or_default()
}
fn latest_seq(&self, origin: u64) -> Option<SeqNum> {
self.chains
.lock()
.unwrap()
.get(&origin)?
.keys()
.next_back()
.copied()
}
}
fn local_executor_with(
rows: &[(u64, u64, &[u8])],
) -> Arc<LocalMeshQueryExecutor<InMemoryChainReader>> {
let reader = Arc::new(InMemoryChainReader::default());
for (origin, seq, payload) in rows {
reader.append(*origin, SeqNum(*seq), payload.to_vec());
}
Arc::new(LocalMeshQueryExecutor::new(reader))
}
fn plan_latest(origin: u64, target_nodes: Vec<u64>) -> ExecutionPlan {
ExecutionPlan {
root: OperatorNode {
operator: OperatorPlan::LatestRead { origin },
target_nodes,
cost: CostEstimate::default(),
},
total_cost: CostEstimate::default(),
}
}
fn plan_between(origin: u64, start: u64, end: u64, target_nodes: Vec<u64>) -> ExecutionPlan {
ExecutionPlan {
root: OperatorNode {
operator: OperatorPlan::BetweenRead {
origin,
start: SeqNum(start),
end: SeqNum(end),
},
target_nodes,
cost: CostEstimate::default(),
},
total_cost: CostEstimate::default(),
}
}
async fn collect_rows(rs: ResultStream) -> Vec<Result<ResultRow, MeshError>> {
rs.collect::<Vec<_>>().await
}
#[tokio::test]
async fn three_node_happy_path_routes_to_first_holder() {
let chain = 0xCAFE_BABE_DEAD_BEEF;
let node_a =
local_executor_with(&[(chain, 1, b"a-1"), (chain, 2, b"a-2"), (chain, 3, b"a-3")]);
let node_b = local_executor_with(&[]);
let node_c = local_executor_with(&[]);
let transport = Arc::new(LoopbackTransport::new());
transport.register(0xA, node_a);
transport.register(0xB, node_b);
transport.register(0xC, node_c);
let fed = FederatedMeshQueryExecutor::new(transport);
let plan = plan_latest(chain, vec![0xA, 0xB, 0xC]);
let running = fed.execute(plan).await.unwrap();
let rows: Vec<_> = collect_rows(running.rows)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].seq, SeqNum(3));
assert_eq!(rows[0].payload, b"a-3");
}
#[tokio::test]
async fn between_streams_all_rows_through_transport() {
let chain = 0x01;
let node = local_executor_with(&[
(chain, 1, b"p-1"),
(chain, 2, b"p-2"),
(chain, 3, b"p-3"),
(chain, 4, b"p-4"),
]);
let transport = Arc::new(LoopbackTransport::new());
transport.register(0xAA, node);
let fed = FederatedMeshQueryExecutor::new(transport);
let plan = plan_between(chain, 1, 5, vec![0xAA]);
let running = fed.execute(plan).await.unwrap();
let rows: Vec<_> = collect_rows(running.rows)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
let seqs: Vec<u64> = rows.iter().map(|r| r.seq.0).collect();
assert_eq!(seqs, vec![1, 2, 3, 4]);
}
#[tokio::test]
async fn failover_skips_offline_target() {
let chain = 0xBEEF;
let node_a = local_executor_with(&[]);
let node_b = local_executor_with(&[]);
let node_c = local_executor_with(&[(chain, 7, b"c-7")]);
let transport = Arc::new(LoopbackTransport::new());
transport.register(0xA, node_a);
transport.register(0xB, node_b);
transport.register(0xC, node_c);
transport.set_offline(0xA, true);
transport.set_offline(0xB, true);
let fed = FederatedMeshQueryExecutor::new(transport);
let plan = plan_latest(chain, vec![0xA, 0xB, 0xC]);
let running = fed.execute(plan).await.unwrap();
let rows: Vec<_> = collect_rows(running.rows)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].seq, SeqNum(7));
assert_eq!(rows[0].payload, b"c-7");
}
#[tokio::test]
async fn all_targets_offline_surfaces_executor_error() {
let chain = 0xBEEF;
let node_a = local_executor_with(&[]);
let node_b = local_executor_with(&[]);
let transport = Arc::new(LoopbackTransport::new());
transport.register(0xA, node_a);
transport.register(0xB, node_b);
transport.set_offline(0xA, true);
transport.set_offline(0xB, true);
let fed = FederatedMeshQueryExecutor::new(transport);
let plan = plan_latest(chain, vec![0xA, 0xB]);
let err = fed.execute(plan).await.unwrap_err();
match err {
MeshError::ExecutorError { node, detail } => {
assert_eq!(node, 0xB);
assert!(detail.contains("all targets failed"), "got: {detail}");
}
other => panic!("expected ExecutorError, got {other:?}"),
}
}
#[tokio::test]
async fn unregistered_target_falls_through_no_route() {
let chain = 0xBEEF;
let node = local_executor_with(&[(chain, 5, b"five")]);
let transport = Arc::new(LoopbackTransport::new());
transport.register(0xB, node);
let fed = FederatedMeshQueryExecutor::new(transport);
let plan = plan_latest(chain, vec![0xA, 0xB]);
let running = fed.execute(plan).await.unwrap();
let rows: Vec<_> = collect_rows(running.rows)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].seq, SeqNum(5));
}
#[tokio::test]
async fn empty_target_nodes_yields_empty_stream() {
let transport = Arc::new(LoopbackTransport::new());
let fed = FederatedMeshQueryExecutor::new(transport);
let plan = plan_latest(0xBEEF, vec![]);
let running = fed.execute(plan).await.unwrap();
let rows = collect_rows(running.rows).await;
assert!(rows.is_empty());
}
#[tokio::test]
async fn not_yet_implemented_surfaces_planner_error_before_transport() {
let transport = Arc::new(LoopbackTransport::new());
let fed = FederatedMeshQueryExecutor::new(transport);
let plan = ExecutionPlan {
root: OperatorNode {
operator: OperatorPlan::NotYetImplemented {
detail: "Join (Phase D)".to_string(),
input: None,
},
target_nodes: vec![0xA],
cost: CostEstimate::default(),
},
total_cost: CostEstimate::default(),
};
let err = fed.execute(plan).await.unwrap_err();
match err {
MeshError::PlannerError { detail } => {
assert!(detail.contains("Join (Phase D)"), "got: {detail}");
}
other => panic!("expected PlannerError, got {other:?}"),
}
}
#[tokio::test]
async fn cancel_before_first_response_short_circuits_stream() {
let chain = 0xFEED;
let node =
local_executor_with(&[(chain, 1, b"p-1"), (chain, 2, b"p-2"), (chain, 3, b"p-3")]);
let transport = Arc::new(LoopbackTransport::new());
transport.register(0xA, node);
let fed = FederatedMeshQueryExecutor::new(transport);
let plan = plan_between(chain, 1, 4, vec![0xA]);
let running = fed.execute(plan).await.unwrap();
running.handle.cancel();
let rows = collect_rows(running.rows).await;
assert!(
rows.iter()
.any(|r| matches!(r, Err(MeshError::QueryCancelled))),
"expected at least one QueryCancelled, got {rows:?}"
);
}
#[tokio::test]
async fn lineage_emit_runs_locally_without_transport_dispatch() {
use super::super::planner::{LineageDirection, LineageEntry};
let transport = Arc::new(LoopbackTransport::new());
let fed = FederatedMeshQueryExecutor::new(transport);
let plan = ExecutionPlan {
root: OperatorNode {
operator: OperatorPlan::LineageEmit {
origin: 0xAA,
direction: LineageDirection::Forward,
entries: vec![
LineageEntry {
origin: 0xAA,
depth: 0,
tip_seq: Some(SeqNum(1)),
},
LineageEntry {
origin: 0xBB,
depth: 1,
tip_seq: None,
},
],
},
target_nodes: vec![],
cost: CostEstimate::default(),
},
total_cost: CostEstimate::default(),
};
let running = fed.execute(plan).await.unwrap();
let rows: Vec<ResultRow> = collect_rows(running.rows)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].origin, 0xAA);
assert_eq!(rows[0].seq, SeqNum(1));
assert_eq!(rows[1].origin, 0xBB);
assert_eq!(rows[1].seq, SeqNum(0));
}
#[tokio::test]
async fn federated_hash_join_fetches_both_sides_and_emits_pairs() {
use super::super::planner::{CostEstimate, JoinKeyMode};
use super::super::query::{JoinKind, JoinedRowPayload};
use std::time::Duration;
let a = 0x111;
let b = 0x222;
let node_a = local_executor_with(&[(a, 1, b"a-1"), (a, 2, b"a-2"), (a, 5, b"a-5")]);
let node_b = local_executor_with(&[(b, 2, b"b-2"), (b, 3, b"b-3"), (b, 5, b"b-5")]);
let transport = Arc::new(LoopbackTransport::new());
transport.register(0xA, node_a);
transport.register(0xB, node_b);
let fed = FederatedMeshQueryExecutor::new(transport);
let plan = ExecutionPlan {
root: OperatorNode {
operator: OperatorPlan::HashJoin {
left: Box::new(OperatorNode {
operator: OperatorPlan::BetweenRead {
origin: a,
start: SeqNum(1),
end: SeqNum(10),
},
target_nodes: vec![0xA],
cost: CostEstimate::default(),
}),
right: Box::new(OperatorNode {
operator: OperatorPlan::BetweenRead {
origin: b,
start: SeqNum(1),
end: SeqNum(10),
},
target_nodes: vec![0xB],
cost: CostEstimate::default(),
}),
key_mode: JoinKeyMode::Seq,
kind: JoinKind::Inner,
strategy: super::super::planner::JoinStrategy::HashBroadcast,
watermark: Duration::from_secs(5),
},
target_nodes: vec![],
cost: CostEstimate::default(),
},
total_cost: CostEstimate::default(),
};
let running = fed.execute(plan).await.unwrap();
let rows: Vec<ResultRow> = collect_rows(running.rows)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
assert_eq!(rows.len(), 2);
let mut decoded: Vec<JoinedRowPayload> = rows
.iter()
.map(|r| postcard::from_bytes(&r.payload).unwrap())
.collect();
decoded.sort_by_key(|j| j.left.as_ref().unwrap().seq);
assert_eq!(decoded[0].left.as_ref().unwrap().payload, b"a-2");
assert_eq!(decoded[0].right.as_ref().unwrap().payload, b"b-2");
assert_eq!(decoded[1].left.as_ref().unwrap().payload, b"a-5");
assert_eq!(decoded[1].right.as_ref().unwrap().payload, b"b-5");
}
#[tokio::test]
async fn federated_left_outer_emits_unmatched_lefts_via_transport() {
use super::super::planner::{CostEstimate, JoinKeyMode};
use super::super::query::{JoinKind, JoinedRowPayload};
use std::time::Duration;
let a = 0xAAAA;
let b = 0xBBBB;
let node_a = local_executor_with(&[(a, 1, b"a-1"), (a, 2, b"a-2")]);
let node_b = local_executor_with(&[(b, 2, b"b-2")]);
let transport = Arc::new(LoopbackTransport::new());
transport.register(0xA, node_a);
transport.register(0xB, node_b);
let fed = FederatedMeshQueryExecutor::new(transport);
let plan = ExecutionPlan {
root: OperatorNode {
operator: OperatorPlan::HashJoin {
left: Box::new(OperatorNode {
operator: OperatorPlan::BetweenRead {
origin: a,
start: SeqNum(1),
end: SeqNum(10),
},
target_nodes: vec![0xA],
cost: CostEstimate::default(),
}),
right: Box::new(OperatorNode {
operator: OperatorPlan::BetweenRead {
origin: b,
start: SeqNum(1),
end: SeqNum(10),
},
target_nodes: vec![0xB],
cost: CostEstimate::default(),
}),
key_mode: JoinKeyMode::Seq,
kind: JoinKind::LeftOuter,
strategy: super::super::planner::JoinStrategy::HashBroadcast,
watermark: Duration::from_secs(5),
},
target_nodes: vec![],
cost: CostEstimate::default(),
},
total_cost: CostEstimate::default(),
};
let running = fed.execute(plan).await.unwrap();
let rows: Vec<ResultRow> = collect_rows(running.rows)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
assert_eq!(rows.len(), 2);
let decoded: Vec<JoinedRowPayload> = rows
.iter()
.map(|r| postcard::from_bytes(&r.payload).unwrap())
.collect();
let matched = decoded.iter().filter(|j| j.right.is_some()).count();
let unmatched = decoded.iter().filter(|j| j.right.is_none()).count();
assert_eq!(matched, 1);
assert_eq!(unmatched, 1);
assert!(decoded.iter().all(|j| j.left.is_some()));
}
#[tokio::test]
async fn federated_aggregate_count_no_group_by_returns_total() {
use super::super::planner::CostEstimate;
use super::super::query::{AggregateRowPayload, AggregateValue};
let chain = 0xCAFE;
let node = local_executor_with(&[
(chain, 1, b"x"),
(chain, 2, b"y"),
(chain, 3, b"z"),
(chain, 4, b"w"),
]);
let transport = Arc::new(LoopbackTransport::new());
transport.register(0xA, node);
let fed = FederatedMeshQueryExecutor::new(transport);
let plan = ExecutionPlan {
root: OperatorNode {
operator: OperatorPlan::AggregateCount {
input: Box::new(OperatorNode {
operator: OperatorPlan::BetweenRead {
origin: chain,
start: SeqNum(1),
end: SeqNum(10),
},
target_nodes: vec![0xA],
cost: CostEstimate::default(),
}),
group_by: None,
},
target_nodes: vec![],
cost: CostEstimate::default(),
},
total_cost: CostEstimate::default(),
};
let running = fed.execute(plan).await.unwrap();
let rows: Vec<ResultRow> = collect_rows(running.rows)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
assert_eq!(rows.len(), 1);
let decoded: AggregateRowPayload = postcard::from_bytes(&rows[0].payload).unwrap();
assert_eq!(decoded.group, None);
assert_eq!(decoded.value, AggregateValue::Count(4));
}
#[test]
fn call_id_is_unique_across_federated_executors_on_same_host() {
let t1 = Arc::new(LoopbackTransport::new());
let t2 = Arc::new(LoopbackTransport::new());
let fed1 = FederatedMeshQueryExecutor::new(t1);
let fed2 = FederatedMeshQueryExecutor::new(t2);
let mut seen = std::collections::HashSet::<u64>::new();
for _ in 0..32 {
assert!(seen.insert(fed1.allocate_id()), "fed1 self-collision");
assert!(seen.insert(fed2.allocate_id()), "fed2 self-collision");
}
}
#[tokio::test]
async fn cancel_after_composite_aggregate_short_circuits_materialized_stream() {
use super::super::planner::CostEstimate;
let chain = 0xC0DE;
let node = local_executor_with(&[(chain, 1, b"x"), (chain, 2, b"y"), (chain, 3, b"z")]);
let transport = Arc::new(LoopbackTransport::new());
transport.register(0xA, node);
let fed = FederatedMeshQueryExecutor::new(transport);
let plan = ExecutionPlan {
root: OperatorNode {
operator: OperatorPlan::AggregateCount {
input: Box::new(OperatorNode {
operator: OperatorPlan::BetweenRead {
origin: chain,
start: SeqNum(1),
end: SeqNum(10),
},
target_nodes: vec![0xA],
cost: CostEstimate::default(),
}),
group_by: None,
},
target_nodes: vec![],
cost: CostEstimate::default(),
},
total_cost: CostEstimate::default(),
};
let running = fed.execute(plan).await.unwrap();
running.handle.cancel();
let rows = collect_rows(running.rows).await;
assert!(
rows.iter()
.any(|r| matches!(r, Err(MeshError::QueryCancelled))),
"expected QueryCancelled to surface from a cancelled composite stream, got {rows:?}"
);
}
#[tokio::test]
async fn three_nodes_disjoint_chains_route_independently() {
let chain_x = 0x111;
let chain_y = 0x222;
let node_a = local_executor_with(&[(chain_x, 1, b"x-1")]);
let node_b = local_executor_with(&[(chain_y, 1, b"y-1")]);
let node_c = local_executor_with(&[]);
let transport = Arc::new(LoopbackTransport::new());
transport.register(0xA, node_a);
transport.register(0xB, node_b);
transport.register(0xC, node_c);
let fed = FederatedMeshQueryExecutor::new(transport);
let rows_x: Vec<_> = collect_rows(
fed.execute(plan_latest(chain_x, vec![0xA]))
.await
.unwrap()
.rows,
)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
assert_eq!(rows_x.len(), 1);
assert_eq!(rows_x[0].payload, b"x-1");
let rows_y: Vec<_> = collect_rows(
fed.execute(plan_latest(chain_y, vec![0xB]))
.await
.unwrap()
.rows,
)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
assert_eq!(rows_y.len(), 1);
assert_eq!(rows_y[0].payload, b"y-1");
}
}