use std::time::Duration;
use serde::{Deserialize, Serialize};
use super::error::MeshError;
use super::query::{AggregateFn, ChainRef, Expr, JoinKey, JoinKind, MeshQuery, QueryV1, SeqNum};
use crate::adapter::net::behavior::capability::CapabilityIndex;
use crate::adapter::net::behavior::predicate::PredicateWire;
use crate::adapter::net::behavior::query::CapabilityQuery;
use crate::adapter::net::behavior::tag::{Tag, TaxonomyAxis};
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ExecutionPlan {
pub root: OperatorNode,
pub total_cost: CostEstimate,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct OperatorNode {
pub operator: OperatorPlan,
pub target_nodes: Vec<u64>,
pub cost: CostEstimate,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum OperatorPlan {
AtRead {
origin: u64,
seq: SeqNum,
},
BetweenRead {
origin: u64,
start: SeqNum,
end: SeqNum,
},
LatestRead {
origin: u64,
},
Filter {
input: Box<OperatorNode>,
predicate: PredicateWire,
},
LineageEmit {
origin: u64,
direction: LineageDirection,
entries: Vec<LineageEntry>,
},
AggregateCount {
input: Box<OperatorNode>,
group_by: Option<JoinKeyMode>,
},
AggregateNumeric {
input: Box<OperatorNode>,
group_by: Option<JoinKeyMode>,
field_path: String,
kind: super::query::NumericAggregateKind,
},
AggregateReduction {
input: Box<OperatorNode>,
group_by: Option<JoinKeyMode>,
field_path: String,
kind: super::query::NumericReductionKind,
},
AggregateDistinct {
input: Box<OperatorNode>,
group_by: Option<JoinKeyMode>,
field_path: String,
},
Window {
input: Box<OperatorNode>,
spec: super::query::WindowSpec,
},
HashJoin {
left: Box<OperatorNode>,
right: Box<OperatorNode>,
key_mode: JoinKeyMode,
kind: JoinKind,
strategy: JoinStrategy,
watermark: Duration,
},
NotYetImplemented {
detail: String,
input: Option<Box<OperatorNode>>,
},
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum JoinKeyMode {
Origin,
Seq,
OriginSeq,
Field(String),
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum JoinStrategy {
HashBroadcast,
SortMerge,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum LineageDirection {
Back,
Forward,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct LineageEntry {
pub origin: u64,
pub depth: u32,
pub tip_seq: Option<SeqNum>,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct CostEstimate {
pub bandwidth_bytes: u64,
pub latency_ms: u64,
}
const PHASE_A_ATOMIC_BANDWIDTH_BYTES: u64 = 64 * 1024;
pub struct MeshQueryPlanner<'a, F>
where
F: Fn(u64) -> Option<Duration>,
{
pub capability_index: &'a CapabilityIndex,
pub rtt_lookup: F,
}
impl<'a, F> MeshQueryPlanner<'a, F>
where
F: Fn(u64) -> Option<Duration>,
{
pub fn new(capability_index: &'a CapabilityIndex, rtt_lookup: F) -> Self {
Self {
capability_index,
rtt_lookup,
}
}
pub fn plan(&self, query: &MeshQuery) -> Result<ExecutionPlan, MeshError> {
let root = match query {
MeshQuery::V1(v1) => self.plan_v1(v1)?,
#[allow(unreachable_patterns)]
_ => {
return Err(MeshError::PlannerError {
detail: "unsupported query version".to_string(),
});
}
};
let total_cost = sum_cost(&root);
Ok(ExecutionPlan { root, total_cost })
}
fn plan_v1(&self, q: &QueryV1) -> Result<OperatorNode, MeshError> {
match q {
QueryV1::At { origin, seq } => self.plan_at(origin, *seq),
QueryV1::Between { origin, start, end } => self.plan_between(origin, *start, *end),
QueryV1::Latest { origin } => self.plan_latest(origin),
QueryV1::Filter { inner, predicate } => {
let input = self.plan(inner)?;
let cost = CostEstimate {
bandwidth_bytes: 0,
latency_ms: 0,
};
Ok(OperatorNode {
operator: OperatorPlan::Filter {
input: Box::new(input.root),
predicate: predicate.clone(),
},
target_nodes: vec![],
cost,
})
}
QueryV1::LineageBack { origin, max_depth } => {
self.plan_lineage(origin, *max_depth, LineageDirection::Back)
}
QueryV1::LineageForward { origin, max_depth } => {
self.plan_lineage(origin, *max_depth, LineageDirection::Forward)
}
QueryV1::Join {
left,
right,
on,
kind,
watermark,
} => self.plan_join(left, right, on, *kind, *watermark),
QueryV1::Aggregate {
inner,
group_by,
agg_fn,
} => self.plan_aggregate(inner, group_by, agg_fn),
QueryV1::Project { inner, .. } => {
let input = self.plan(inner)?;
self.plan_not_yet_implemented("Project (Phase A.2)", Some(Box::new(input.root)))
}
QueryV1::OrderBy { inner, .. } => {
let input = self.plan(inner)?;
self.plan_not_yet_implemented("OrderBy (Phase A.2)", Some(Box::new(input.root)))
}
QueryV1::Window { inner, spec } => self.plan_window(inner, spec),
}
}
fn plan_window(
&self,
inner: &MeshQuery,
spec: &super::query::WindowSpec,
) -> Result<OperatorNode, MeshError> {
match spec {
super::query::WindowSpec::TumblingSeq { size } if *size == 0 => {
return Err(MeshError::PlannerError {
detail: "Window size must be >= 1".to_string(),
});
}
_ => {}
}
let inner_plan = self.plan(inner)?;
let cost = CostEstimate {
bandwidth_bytes: inner_plan.total_cost.bandwidth_bytes,
latency_ms: inner_plan.total_cost.latency_ms,
};
Ok(OperatorNode {
operator: OperatorPlan::Window {
input: Box::new(inner_plan.root),
spec: spec.clone(),
},
target_nodes: vec![],
cost,
})
}
fn plan_at(&self, origin: &ChainRef, seq: SeqNum) -> Result<OperatorNode, MeshError> {
let origin_hash = self.resolve_origin(origin)?;
let coverage = self.collect_coverage(origin_hash);
let targets = self.select_targets_at(&coverage, seq);
if targets.is_empty() && !coverage.is_empty() {
return Err(MeshError::HistoricalRangeUnavailable {
origin: origin_hash,
requested: seq..SeqNum(seq.0.saturating_add(1)),
available: coverage
.into_iter()
.filter_map(|c| c.claim.advertised())
.collect(),
});
}
let cost = self.atomic_cost(&targets);
Ok(OperatorNode {
operator: OperatorPlan::AtRead {
origin: origin_hash,
seq,
},
target_nodes: targets,
cost,
})
}
fn plan_between(
&self,
origin: &ChainRef,
start: SeqNum,
end: SeqNum,
) -> Result<OperatorNode, MeshError> {
if start >= end {
return Err(MeshError::PlannerError {
detail: format!("Between requires start < end; got {start:?} >= {end:?}"),
});
}
let origin_hash = self.resolve_origin(origin)?;
let coverage = self.collect_coverage(origin_hash);
let targets = self.select_targets_between(&coverage, start, end);
if targets.is_empty() && !coverage.is_empty() {
return Err(MeshError::HistoricalRangeUnavailable {
origin: origin_hash,
requested: start..end,
available: coverage
.into_iter()
.filter_map(|c| c.claim.advertised())
.collect(),
});
}
let cost = self.atomic_cost(&targets);
Ok(OperatorNode {
operator: OperatorPlan::BetweenRead {
origin: origin_hash,
start,
end,
},
target_nodes: targets,
cost,
})
}
fn plan_latest(&self, origin: &ChainRef) -> Result<OperatorNode, MeshError> {
let origin_hash = self.resolve_origin(origin)?;
let coverage = self.collect_coverage(origin_hash);
let targets = self.select_targets_latest(&coverage);
let cost = self.atomic_cost(&targets);
Ok(OperatorNode {
operator: OperatorPlan::LatestRead {
origin: origin_hash,
},
target_nodes: targets,
cost,
})
}
fn plan_lineage(
&self,
origin: &ChainRef,
max_depth: u32,
direction: LineageDirection,
) -> Result<OperatorNode, MeshError> {
let origin_hash = self.resolve_origin(origin)?;
let entries = match direction {
LineageDirection::Back => self.walk_lineage_back(origin_hash, max_depth)?,
LineageDirection::Forward => self.walk_lineage_forward(origin_hash, max_depth)?,
};
let cost = CostEstimate {
bandwidth_bytes: entries.len() as u64 * 64,
latency_ms: 0,
};
Ok(OperatorNode {
operator: OperatorPlan::LineageEmit {
origin: origin_hash,
direction,
entries,
},
target_nodes: vec![],
cost,
})
}
fn plan_join(
&self,
left: &MeshQuery,
right: &MeshQuery,
on: &JoinKey,
kind: JoinKind,
watermark: Duration,
) -> Result<OperatorNode, MeshError> {
let key_mode = key_mode_for_join(on)?;
let left_plan = self.plan(left)?;
let right_plan = self.plan(right)?;
let cost = CostEstimate {
bandwidth_bytes: left_plan
.total_cost
.bandwidth_bytes
.saturating_add(right_plan.total_cost.bandwidth_bytes),
latency_ms: left_plan
.total_cost
.latency_ms
.max(right_plan.total_cost.latency_ms),
};
let strategy = JoinStrategy::HashBroadcast;
Ok(OperatorNode {
operator: OperatorPlan::HashJoin {
left: Box::new(left_plan.root),
right: Box::new(right_plan.root),
key_mode,
kind,
strategy,
watermark,
},
target_nodes: vec![],
cost,
})
}
fn plan_aggregate(
&self,
inner: &MeshQuery,
group_by: &[Expr],
agg_fn: &AggregateFn,
) -> Result<OperatorNode, MeshError> {
let key_mode = group_by_mode(group_by)?;
let inner_plan = self.plan(inner)?;
let cost = CostEstimate {
bandwidth_bytes: inner_plan.total_cost.bandwidth_bytes,
latency_ms: inner_plan.total_cost.latency_ms,
};
let operator = match agg_fn {
AggregateFn::Count => OperatorPlan::AggregateCount {
input: Box::new(inner_plan.root),
group_by: key_mode,
},
AggregateFn::Sum { field } => {
let path = field_path_required(field, "Sum")?;
OperatorPlan::AggregateNumeric {
input: Box::new(inner_plan.root),
group_by: key_mode,
field_path: path,
kind: super::query::NumericAggregateKind::Sum,
}
}
AggregateFn::Avg { field } => {
let path = field_path_required(field, "Avg")?;
OperatorPlan::AggregateNumeric {
input: Box::new(inner_plan.root),
group_by: key_mode,
field_path: path,
kind: super::query::NumericAggregateKind::Avg,
}
}
AggregateFn::Min { field } => {
let path = field_path_required(field, "Min")?;
OperatorPlan::AggregateReduction {
input: Box::new(inner_plan.root),
group_by: key_mode,
field_path: path,
kind: super::query::NumericReductionKind::Min,
}
}
AggregateFn::Max { field } => {
let path = field_path_required(field, "Max")?;
OperatorPlan::AggregateReduction {
input: Box::new(inner_plan.root),
group_by: key_mode,
field_path: path,
kind: super::query::NumericReductionKind::Max,
}
}
AggregateFn::PercentileExact { field, p } => {
if !p.is_finite() || !(0.0..=1.0).contains(p) {
return Err(MeshError::PlannerError {
detail: format!("PercentileExact p must be in [0.0, 1.0], got {p}"),
});
}
let path = field_path_required(field, "PercentileExact")?;
OperatorPlan::AggregateReduction {
input: Box::new(inner_plan.root),
group_by: key_mode,
field_path: path,
kind: super::query::NumericReductionKind::Percentile { p: *p },
}
}
AggregateFn::DistinctCountExact { field } => {
let path = field_path_required(field, "DistinctCountExact")?;
OperatorPlan::AggregateDistinct {
input: Box::new(inner_plan.root),
group_by: key_mode,
field_path: path,
}
}
AggregateFn::DistinctCountHll { .. } | AggregateFn::PercentileTDigest { .. } => {
return Err(MeshError::PlannerError {
detail: format!(
"aggregate function {agg_fn:?} requires a sketch implementation (HLL p=14 / T-Digest c=100 per locked decision #3); deferred to Phase F. Use DistinctCountExact / PercentileExact for now."
),
});
}
};
Ok(OperatorNode {
operator,
target_nodes: vec![],
cost,
})
}
fn walk_lineage_back(
&self,
start: u64,
max_depth: u32,
) -> Result<Vec<LineageEntry>, MeshError> {
let mut visited: std::collections::HashSet<u64> = std::collections::HashSet::new();
visited.insert(start);
let mut entries = vec![LineageEntry {
origin: start,
depth: 0,
tip_seq: self.best_tip(start),
}];
let mut current = start;
for depth in 1..=max_depth {
let Some(parent) = self.parent_of(current) else {
return Ok(entries);
};
if !visited.insert(parent) {
let mut cycle: Vec<u64> = entries
.iter()
.map(|e| e.origin)
.skip_while(|o| *o != parent)
.collect();
cycle.push(parent);
return Err(MeshError::LineageCycleDetected {
origin: start,
cycle,
});
}
entries.push(LineageEntry {
origin: parent,
depth,
tip_seq: self.best_tip(parent),
});
current = parent;
}
if max_depth > 0 && self.parent_of(current).is_some() {
return Err(MeshError::LineageMaxDepthExceeded {
origin: start,
depth: max_depth,
});
}
Ok(entries)
}
fn walk_lineage_forward(
&self,
start: u64,
max_depth: u32,
) -> Result<Vec<LineageEntry>, MeshError> {
use std::collections::VecDeque;
let mut visited: std::collections::HashSet<u64> = std::collections::HashSet::new();
visited.insert(start);
let mut entries = vec![LineageEntry {
origin: start,
depth: 0,
tip_seq: self.best_tip(start),
}];
let mut frontier: VecDeque<(u64, u32)> = VecDeque::new();
frontier.push_back((start, 0));
while let Some((current, depth)) = frontier.pop_front() {
let mut children = self.children_of(current);
if depth >= max_depth {
if max_depth > 0 && !children.is_empty() {
return Err(MeshError::LineageMaxDepthExceeded {
origin: start,
depth: max_depth,
});
}
continue;
}
children.sort_unstable();
for child in children {
if !visited.insert(child) {
continue;
}
entries.push(LineageEntry {
origin: child,
depth: depth + 1,
tip_seq: self.best_tip(child),
});
frontier.push_back((child, depth + 1));
}
}
Ok(entries)
}
fn parent_of(&self, child: u64) -> Option<u64> {
let mut candidates: Vec<(u64, u64)> = Vec::new();
for node_id in self.capability_index.all_nodes() {
let Some(caps) = self.capability_index.get(node_id) else {
continue;
};
let mut hosts_child = false;
let mut fork_candidates: Vec<u64> = Vec::new();
for tag in &caps.tags {
let Tag::Reserved { prefix, body } = tag else {
continue;
};
match prefix.as_str() {
"causal:" if parse_causal_body(body) == Some(child) => {
hosts_child = true;
}
"fork-of:" => {
if let Some(p) = parse_fork_body(body) {
fork_candidates.push(p);
}
}
_ => {}
}
}
if hosts_child {
for parent in fork_candidates {
candidates.push((parent, node_id));
}
}
}
candidates.sort_unstable();
candidates.first().map(|(parent, _)| *parent)
}
fn children_of(&self, parent: u64) -> Vec<u64> {
let mut out = Vec::new();
for node_id in self.capability_index.all_nodes() {
let Some(caps) = self.capability_index.get(node_id) else {
continue;
};
let mut has_fork_to_parent = false;
let mut causal_candidates: Vec<u64> = Vec::new();
for tag in &caps.tags {
let Tag::Reserved { prefix, body } = tag else {
continue;
};
match prefix.as_str() {
"fork-of:" if parse_fork_body(body) == Some(parent) => {
has_fork_to_parent = true;
}
"causal:" => {
if let Some(origin) = parse_causal_body(body) {
causal_candidates.push(origin);
}
}
_ => {}
}
}
if has_fork_to_parent {
causal_candidates.sort_unstable();
if let Some(&chain) = causal_candidates.first() {
if chain != parent {
out.push(chain);
}
}
}
}
out.sort_unstable();
out.dedup();
out
}
fn best_tip(&self, chain: u64) -> Option<SeqNum> {
self.collect_coverage(chain)
.into_iter()
.filter_map(|c| c.claim.latest_tip())
.max()
}
#[expect(
clippy::expect_used,
reason = "match arm `1` guarantees the iterator yields exactly one element"
)]
fn resolve_origin(&self, origin: &ChainRef) -> Result<u64, MeshError> {
match origin {
ChainRef::OriginHash(h) => Ok(*h),
ChainRef::Discovered(wire) => {
let predicate =
wire.clone()
.into_predicate()
.map_err(|e| MeshError::PlannerError {
detail: format!("Discovered predicate rebuild failed: {e:?}"),
})?;
let candidates = self.capability_index.filter(&predicate);
let mut origins: std::collections::BTreeSet<u64> =
std::collections::BTreeSet::new();
for (_node_id, caps) in &candidates {
for tag in &caps.tags {
if let Some(hash) = parse_causal_origin(tag) {
origins.insert(hash);
}
}
}
match origins.len() {
0 => Err(MeshError::NoCapableHolder {
origin: 0,
requirement: format!("{:?}", predicate),
}),
1 => Ok(*origins.iter().next().expect("len == 1")),
_ => Err(MeshError::AmbiguousDiscovery {
matches: origins.into_iter().collect(),
requirement: format!("{:?}", predicate),
}),
}
}
}
}
fn collect_coverage(&self, origin_hash: u64) -> Vec<HolderCoverage> {
let hex = chain_hex(origin_hash);
let mut out: Vec<HolderCoverage> = Vec::new();
for node_id in self.capability_index.all_nodes() {
let claim = self
.capability_index
.with_caps(node_id, |caps| {
caps.tags
.iter()
.filter_map(|t| parse_causal_claim(t, &hex))
.max_by(|a, b| {
specificity_rank(a)
.cmp(&specificity_rank(b))
.then_with(|| claim_cmp_key(a).cmp(&claim_cmp_key(b)))
})
})
.unwrap_or(None);
if let Some(claim) = claim {
out.push(HolderCoverage {
node_id,
rtt: (self.rtt_lookup)(node_id),
claim,
});
}
}
sort_by_proximity(&mut out);
out
}
fn select_targets_at(&self, coverage: &[HolderCoverage], seq: SeqNum) -> Vec<u64> {
coverage
.iter()
.filter(|c| c.claim.covers_seq(seq))
.map(|c| c.node_id)
.collect()
}
fn select_targets_between(
&self,
coverage: &[HolderCoverage],
start: SeqNum,
end: SeqNum,
) -> Vec<u64> {
coverage
.iter()
.filter(|c| c.claim.covers_range(start, end))
.map(|c| c.node_id)
.collect()
}
fn select_targets_latest(&self, coverage: &[HolderCoverage]) -> Vec<u64> {
let mut with_tip: Vec<&HolderCoverage> = coverage
.iter()
.filter(|c| c.claim.latest_tip().is_some())
.collect();
with_tip.sort_by_key(|c| std::cmp::Reverse(c.claim.latest_tip()));
let mut out: Vec<u64> = with_tip.iter().map(|c| c.node_id).collect();
for c in coverage {
if c.claim.latest_tip().is_none() {
out.push(c.node_id);
}
}
out
}
fn atomic_cost(&self, targets: &[u64]) -> CostEstimate {
let bandwidth_bytes = (targets.len() as u64) * PHASE_A_ATOMIC_BANDWIDTH_BYTES;
let latency_ms = targets
.iter()
.filter_map(|nid| (self.rtt_lookup)(*nid))
.map(|d| d.as_millis() as u64)
.min()
.unwrap_or(0);
CostEstimate {
bandwidth_bytes,
latency_ms,
}
}
fn plan_not_yet_implemented(
&self,
detail: &str,
input: Option<Box<OperatorNode>>,
) -> Result<OperatorNode, MeshError> {
Ok(OperatorNode {
operator: OperatorPlan::NotYetImplemented {
detail: detail.to_string(),
input,
},
target_nodes: vec![],
cost: CostEstimate::default(),
})
}
}
fn sum_cost(node: &OperatorNode) -> CostEstimate {
let mut acc = node.cost;
match &node.operator {
OperatorPlan::Filter { input, .. } => {
let inner = sum_cost(input);
acc.bandwidth_bytes = acc.bandwidth_bytes.saturating_add(inner.bandwidth_bytes);
acc.latency_ms = acc.latency_ms.saturating_add(inner.latency_ms);
}
OperatorPlan::HashJoin { left, right, .. } => {
let l = sum_cost(left);
let r = sum_cost(right);
acc.bandwidth_bytes = acc
.bandwidth_bytes
.saturating_add(l.bandwidth_bytes)
.saturating_add(r.bandwidth_bytes);
acc.latency_ms = acc
.latency_ms
.saturating_add(l.latency_ms.max(r.latency_ms));
}
OperatorPlan::AggregateCount { input, .. } => {
let inner = sum_cost(input);
acc.bandwidth_bytes = acc.bandwidth_bytes.saturating_add(inner.bandwidth_bytes);
acc.latency_ms = acc.latency_ms.saturating_add(inner.latency_ms);
}
OperatorPlan::AggregateNumeric { input, .. } => {
let inner = sum_cost(input);
acc.bandwidth_bytes = acc.bandwidth_bytes.saturating_add(inner.bandwidth_bytes);
acc.latency_ms = acc.latency_ms.saturating_add(inner.latency_ms);
}
OperatorPlan::AggregateReduction { input, .. }
| OperatorPlan::AggregateDistinct { input, .. }
| OperatorPlan::Window { input, .. } => {
let inner = sum_cost(input);
acc.bandwidth_bytes = acc.bandwidth_bytes.saturating_add(inner.bandwidth_bytes);
acc.latency_ms = acc.latency_ms.saturating_add(inner.latency_ms);
}
OperatorPlan::NotYetImplemented {
input: Some(input), ..
} => {
let inner = sum_cost(input);
acc.bandwidth_bytes = acc.bandwidth_bytes.saturating_add(inner.bandwidth_bytes);
acc.latency_ms = acc.latency_ms.saturating_add(inner.latency_ms);
}
OperatorPlan::AtRead { .. }
| OperatorPlan::BetweenRead { .. }
| OperatorPlan::LatestRead { .. }
| OperatorPlan::LineageEmit { .. }
| OperatorPlan::NotYetImplemented { input: None, .. } => {}
}
acc
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum CausalClaim {
Presence,
Tip { tip_seq: SeqNum },
Range { start: SeqNum, end: SeqNum },
}
impl CausalClaim {
fn covers_seq(&self, seq: SeqNum) -> bool {
match self {
Self::Presence => true,
Self::Tip { tip_seq } => seq.0 <= tip_seq.0,
Self::Range { start, end } => seq.0 >= start.0 && seq.0 < end.0,
}
}
fn covers_range(&self, start: SeqNum, end: SeqNum) -> bool {
match self {
Self::Presence => true,
Self::Tip { tip_seq } => end.0 <= tip_seq.0.saturating_add(1),
Self::Range { start: s, end: e } => s.0 <= start.0 && end.0 <= e.0,
}
}
fn advertised(&self) -> Option<std::ops::Range<SeqNum>> {
match self {
Self::Presence => None,
Self::Tip { tip_seq } => Some(SeqNum(0)..SeqNum(tip_seq.0.saturating_add(1))),
Self::Range { start, end } => Some(*start..*end),
}
}
fn latest_tip(&self) -> Option<SeqNum> {
match self {
Self::Presence => None,
Self::Tip { tip_seq } => Some(*tip_seq),
Self::Range { end, .. } => Some(SeqNum(end.0.saturating_sub(1))),
}
}
}
#[derive(Clone, Debug)]
struct HolderCoverage {
node_id: u64,
rtt: Option<Duration>,
claim: CausalClaim,
}
fn specificity_rank(claim: &CausalClaim) -> u8 {
match claim {
CausalClaim::Range { .. } => 2,
CausalClaim::Tip { .. } => 1,
CausalClaim::Presence => 0,
}
}
fn claim_cmp_key(claim: &CausalClaim) -> (u64, u64, u8) {
match claim {
CausalClaim::Range { start, end } => (start.0, end.0, 2),
CausalClaim::Tip { tip_seq } => (0, tip_seq.0, 1),
CausalClaim::Presence => (u64::MAX, u64::MAX, 0),
}
}
fn sort_by_proximity(coverage: &mut [HolderCoverage]) {
coverage.sort_by(|a, b| match (a.rtt, b.rtt) {
(Some(ra), Some(rb)) => ra.cmp(&rb).then_with(|| a.node_id.cmp(&b.node_id)),
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
(None, None) => a.node_id.cmp(&b.node_id),
});
}
const HEX_NIBBLES: &[u8; 16] = b"0123456789abcdef";
fn chain_hex(origin_hash: u64) -> String {
let mut buf = [0u8; 16];
let mut h = origin_hash;
for i in (0..16).rev() {
buf[i] = HEX_NIBBLES[(h & 0xF) as usize];
h >>= 4;
}
unsafe { String::from_utf8_unchecked(buf.to_vec()) }
}
fn parse_causal_claim(tag: &Tag, origin_hex: &str) -> Option<CausalClaim> {
let Tag::Reserved { prefix, body } = tag else {
return None;
};
if prefix != "causal:" {
return None;
}
if !body.starts_with(origin_hex) {
return None;
}
let rest = &body[origin_hex.len()..];
if rest.is_empty() {
return Some(CausalClaim::Presence);
}
if let Some(tip_str) = rest.strip_prefix(':') {
let tip: u64 = tip_str.parse().ok()?;
return Some(CausalClaim::Tip {
tip_seq: SeqNum(tip),
});
}
if let Some(range_body) = rest.strip_prefix('[').and_then(|s| s.strip_suffix(']')) {
let (start_str, end_str) = range_body.split_once("..")?;
let start: u64 = start_str.parse().ok()?;
let end: u64 = end_str.parse().ok()?;
if start >= end {
return None;
}
return Some(CausalClaim::Range {
start: SeqNum(start),
end: SeqNum(end),
});
}
None
}
fn parse_causal_origin(tag: &Tag) -> Option<u64> {
let Tag::Reserved { prefix, body } = tag else {
return None;
};
if prefix != "causal:" {
return None;
}
parse_causal_body(body)
}
fn parse_causal_body(body: &str) -> Option<u64> {
let stem = body.split_once([':', '[']).map(|(s, _)| s).unwrap_or(body);
if stem.len() != 16 {
return None;
}
u64::from_str_radix(stem, 16).ok()
}
fn parse_fork_body(body: &str) -> Option<u64> {
if body.len() != 16 {
return None;
}
u64::from_str_radix(body, 16).ok()
}
fn key_mode_for_join(on: &JoinKey) -> Result<JoinKeyMode, MeshError> {
let left = field_name(&on.left_field).ok_or_else(|| MeshError::PlannerError {
detail: format!(
"join left key must be a field reference, got {:?}",
on.left_field
),
})?;
let right = field_name(&on.right_field).ok_or_else(|| MeshError::PlannerError {
detail: format!(
"join right key must be a field reference, got {:?}",
on.right_field
),
})?;
if left != right {
return Err(MeshError::PlannerError {
detail: format!(
"join key sides must reference the same field name (left='{left}', right='{right}')"
),
});
}
Ok(match left {
"origin" => JoinKeyMode::Origin,
"seq" => JoinKeyMode::Seq,
"origin,seq" => JoinKeyMode::OriginSeq,
other => JoinKeyMode::Field(other.to_string()),
})
}
fn field_name(e: &Expr) -> Option<&str> {
match e {
Expr::Field(s) => Some(s.as_str()),
_ => None,
}
}
fn field_path_required(e: &Expr, op_name: &str) -> Result<String, MeshError> {
match e {
Expr::Field(s) => Ok(s.clone()),
other => Err(MeshError::PlannerError {
detail: format!(
"{op_name} requires Expr::Field(<path>) for its field argument, got {other:?}"
),
}),
}
}
fn group_by_mode(group_by: &[Expr]) -> Result<Option<JoinKeyMode>, MeshError> {
if group_by.is_empty() {
return Ok(None);
}
if group_by.len() == 1 {
let name = field_name(&group_by[0]).ok_or_else(|| MeshError::PlannerError {
detail: format!(
"group_by[0] must be a field reference, got {:?}",
group_by[0]
),
})?;
return match name {
"origin" => Ok(Some(JoinKeyMode::Origin)),
"seq" => Ok(Some(JoinKeyMode::Seq)),
other => Err(MeshError::PlannerError {
detail: format!(
"group_by field '{other}' is not a row-intrinsic key; only 'origin' / 'seq' supported in Phase E-1"
),
}),
};
}
if group_by.len() == 2 {
let l = field_name(&group_by[0]);
let r = field_name(&group_by[1]);
if matches!(
(l, r),
(Some("origin"), Some("seq")) | (Some("seq"), Some("origin"))
) {
return Ok(Some(JoinKeyMode::OriginSeq));
}
}
Err(MeshError::PlannerError {
detail: format!(
"group_by shape {group_by:?} not supported in Phase E-1; only [origin], [seq], or [origin, seq] are row-intrinsic"
),
})
}
#[allow(dead_code)]
const _PLANNER_USES_TAXONOMY_AXIS: TaxonomyAxis = TaxonomyAxis::Dataforts;
#[allow(dead_code)]
fn _planner_uses_capability_query<Q: CapabilityQuery>(_q: &Q) {}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::behavior::capability::{
CapabilityAnnouncement, CapabilityIndex, CapabilitySet,
};
use crate::adapter::net::identity::EntityId;
#[test]
fn chain_hex_matches_format_macro_byte_for_byte() {
let cases: &[u64] = &[
0,
1,
0xF,
0x10,
0xFF,
0x100,
0xDEAD_BEEF,
0x8000_0000_0000_0000,
0x7FFF_FFFF_FFFF_FFFF,
u64::MAX,
u64::MAX - 1,
0x0123_4567_89AB_CDEF,
0xFEDC_BA98_7654_3210,
0xCAFE_BABE_DEAD_BEEF,
0x1234_5678_9ABC_DEF0,
];
for &h in cases {
let reference = format!("{h:016x}");
let actual = chain_hex(h);
assert_eq!(
actual, reference,
"chain_hex({h:#x}) diverged from format!(\"{{:016x}}\")",
);
assert_eq!(actual.len(), 16, "chain_hex must always emit 16 chars");
}
}
fn causal_tag(body: impl Into<String>) -> Tag {
Tag::Reserved {
prefix: "causal:".to_string(),
body: body.into(),
}
}
fn caps_with_causal_presence(origin_hash: u64) -> CapabilitySet {
let mut caps = CapabilitySet::new();
caps.tags.insert(causal_tag(chain_hex(origin_hash)));
caps
}
fn caps_with_causal_tip(origin_hash: u64, tip: u64) -> CapabilitySet {
let mut caps = CapabilitySet::new();
caps.tags
.insert(causal_tag(format!("{}:{}", chain_hex(origin_hash), tip)));
caps
}
fn caps_with_causal_range(origin_hash: u64, start: u64, end: u64) -> CapabilitySet {
let mut caps = CapabilitySet::new();
caps.tags.insert(causal_tag(format!(
"{}[{}..{}]",
chain_hex(origin_hash),
start,
end
)));
caps
}
fn index_with(holders: Vec<(u64, CapabilitySet)>) -> CapabilityIndex {
let index = CapabilityIndex::new();
for (node_id, caps) in holders {
index.index(CapabilityAnnouncement::new(
node_id,
EntityId::from_bytes([node_id as u8; 32]),
1,
caps,
));
}
index
}
fn make_index_with_holder(node_id: u64, origin_hash: u64) -> CapabilityIndex {
index_with(vec![(node_id, caps_with_causal_presence(origin_hash))])
}
fn empty_index() -> CapabilityIndex {
CapabilityIndex::new()
}
fn rtt_none(_nid: u64) -> Option<Duration> {
None
}
#[test]
fn parse_causal_presence_form() {
let origin = 0xDEAD_BEEF_CAFE_BABE_u64;
let hex = chain_hex(origin);
let claim = parse_causal_claim(&causal_tag(hex.clone()), &hex);
assert_eq!(claim, Some(CausalClaim::Presence));
}
#[test]
fn parse_causal_tip_form() {
let origin = 0x1234_5678_9ABC_DEF0_u64;
let hex = chain_hex(origin);
let claim = parse_causal_claim(&causal_tag(format!("{hex}:1000")), &hex);
assert_eq!(
claim,
Some(CausalClaim::Tip {
tip_seq: SeqNum(1000)
})
);
}
#[test]
fn parse_causal_range_form() {
let origin = 0xAAAA_BBBB_CCCC_DDDD_u64;
let hex = chain_hex(origin);
let claim = parse_causal_claim(&causal_tag(format!("{hex}[100..500]")), &hex);
assert_eq!(
claim,
Some(CausalClaim::Range {
start: SeqNum(100),
end: SeqNum(500),
})
);
}
#[test]
fn parse_causal_rejects_inverted_range() {
let hex = chain_hex(1);
let claim = parse_causal_claim(&causal_tag(format!("{hex}[500..100]")), &hex);
assert_eq!(claim, None);
}
#[test]
fn parse_causal_rejects_unknown_suffix() {
let hex = chain_hex(1);
let claim = parse_causal_claim(&causal_tag(format!("{hex}?weird")), &hex);
assert_eq!(claim, None);
}
#[test]
fn parse_causal_rejects_wrong_hash() {
let other_hex = chain_hex(0xFFFF);
let claim = parse_causal_claim(&causal_tag(format!("{other_hex}:42")), &chain_hex(0xAAAA));
assert_eq!(claim, None);
}
#[test]
fn causal_claim_covers_seq_semantics() {
assert!(CausalClaim::Presence.covers_seq(SeqNum(0)));
assert!(CausalClaim::Presence.covers_seq(SeqNum(u64::MAX)));
let tip = CausalClaim::Tip {
tip_seq: SeqNum(100),
};
assert!(tip.covers_seq(SeqNum(0)));
assert!(tip.covers_seq(SeqNum(100)));
assert!(!tip.covers_seq(SeqNum(101)));
let range = CausalClaim::Range {
start: SeqNum(50),
end: SeqNum(150),
};
assert!(!range.covers_seq(SeqNum(49)));
assert!(range.covers_seq(SeqNum(50)));
assert!(range.covers_seq(SeqNum(149)));
assert!(!range.covers_seq(SeqNum(150))); }
#[test]
fn causal_claim_covers_range_semantics() {
assert!(CausalClaim::Presence.covers_range(SeqNum(0), SeqNum(1_000)));
let tip = CausalClaim::Tip {
tip_seq: SeqNum(100),
};
assert!(tip.covers_range(SeqNum(0), SeqNum(101)));
assert!(tip.covers_range(SeqNum(50), SeqNum(101)));
assert!(!tip.covers_range(SeqNum(0), SeqNum(102)));
let range = CausalClaim::Range {
start: SeqNum(100),
end: SeqNum(200),
};
assert!(range.covers_range(SeqNum(100), SeqNum(200)));
assert!(range.covers_range(SeqNum(150), SeqNum(175)));
assert!(!range.covers_range(SeqNum(50), SeqNum(150))); assert!(!range.covers_range(SeqNum(150), SeqNum(250))); }
#[test]
fn causal_claim_advertised_renders_half_open_range() {
assert_eq!(CausalClaim::Presence.advertised(), None);
assert_eq!(
(CausalClaim::Tip {
tip_seq: SeqNum(99)
})
.advertised(),
Some(SeqNum(0)..SeqNum(100))
);
assert_eq!(
(CausalClaim::Range {
start: SeqNum(10),
end: SeqNum(50),
})
.advertised(),
Some(SeqNum(10)..SeqNum(50))
);
}
#[test]
fn causal_claim_latest_tip_ordering() {
assert_eq!(CausalClaim::Presence.latest_tip(), None);
assert_eq!(
(CausalClaim::Tip {
tip_seq: SeqNum(42)
})
.latest_tip(),
Some(SeqNum(42))
);
assert_eq!(
(CausalClaim::Range {
start: SeqNum(10),
end: SeqNum(50),
})
.latest_tip(),
Some(SeqNum(49))
);
}
#[test]
fn parse_causal_origin_extracts_u64_from_each_form() {
let origin = 0xCAFE_BABE_DEAD_BEEF_u64;
let hex = chain_hex(origin);
assert_eq!(parse_causal_origin(&causal_tag(hex.clone())), Some(origin));
assert_eq!(
parse_causal_origin(&causal_tag(format!("{hex}:42"))),
Some(origin)
);
assert_eq!(
parse_causal_origin(&causal_tag(format!("{hex}[0..100]"))),
Some(origin)
);
assert_eq!(
parse_causal_origin(&Tag::Reserved {
prefix: "heat:".to_string(),
body: hex.clone(),
}),
None
);
assert_eq!(parse_causal_origin(&causal_tag("abc".to_string())), None);
}
#[test]
fn plan_latest_returns_atomic_with_holder() {
let origin = 0xABAB_ABAB_ABAB_ABAB_u64;
let index = make_index_with_holder(42, origin);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::OriginHash(origin),
}))
.expect("plan ok");
match plan.root.operator {
OperatorPlan::LatestRead { origin: o } => assert_eq!(o, origin),
other => panic!("expected LatestRead; got {other:?}"),
}
assert_eq!(plan.root.target_nodes, vec![42]);
}
#[test]
fn plan_latest_with_no_holders_returns_empty_targets() {
let index = empty_index();
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::OriginHash(0),
}))
.expect("plan ok");
assert!(plan.root.target_nodes.is_empty());
}
#[test]
fn plan_between_rejects_inverted_range() {
let index = empty_index();
let planner = MeshQueryPlanner::new(&index, rtt_none);
let err = planner
.plan(&MeshQuery::V1(QueryV1::Between {
origin: ChainRef::OriginHash(0),
start: SeqNum(100),
end: SeqNum(50),
}))
.expect_err("inverted range must fail");
match err {
MeshError::PlannerError { detail } => assert!(detail.contains("start < end")),
other => panic!("expected PlannerError; got {other:?}"),
}
}
#[test]
fn plan_between_accepts_valid_range_with_covering_holder() {
let origin = 0x4242_4242_4242_4242_u64;
let index = index_with(vec![(7, caps_with_causal_tip(origin, 1000))]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::Between {
origin: ChainRef::OriginHash(origin),
start: SeqNum(0),
end: SeqNum(1000),
}))
.expect("plan ok");
match plan.root.operator {
OperatorPlan::BetweenRead { start, end, .. } => {
assert_eq!(start, SeqNum(0));
assert_eq!(end, SeqNum(1000));
}
other => panic!("expected BetweenRead; got {other:?}"),
}
assert_eq!(plan.root.target_nodes, vec![7]);
}
#[test]
fn plan_at_routes_to_holder() {
let origin = 0xCCCC_CCCC_CCCC_CCCC_u64;
let index = make_index_with_holder(99, origin);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::At {
origin: ChainRef::OriginHash(origin),
seq: SeqNum(7),
}))
.expect("plan ok");
match plan.root.operator {
OperatorPlan::AtRead { origin: o, seq } => {
assert_eq!(o, origin);
assert_eq!(seq, SeqNum(7));
}
other => panic!("expected AtRead; got {other:?}"),
}
assert_eq!(plan.root.target_nodes, vec![99]);
}
#[test]
fn plan_holders_lex_sorted_when_no_rtt() {
let origin = 0xEEEE_EEEE_EEEE_EEEE_u64;
let caps = caps_with_causal_presence(origin);
let index = index_with(vec![(200, caps.clone()), (50, caps.clone()), (100, caps)]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::OriginHash(origin),
}))
.unwrap();
assert_eq!(plan.root.target_nodes, vec![50, 100, 200]);
}
#[test]
fn at_picks_holder_whose_tip_covers_seq() {
let origin = 0x1111_2222_3333_4444_u64;
let index = index_with(vec![
(50, caps_with_causal_tip(origin, 50)),
(200, caps_with_causal_tip(origin, 200)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::At {
origin: ChainRef::OriginHash(origin),
seq: SeqNum(100),
}))
.unwrap();
assert_eq!(plan.root.target_nodes, vec![200]);
}
#[test]
fn between_picks_only_holders_with_full_coverage() {
let origin = 0xFEED_FACE_FEED_FACE_u64;
let index = index_with(vec![
(1, caps_with_causal_range(origin, 0, 400)),
(2, caps_with_causal_range(origin, 50, 600)),
(3, caps_with_causal_tip(origin, 700)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::Between {
origin: ChainRef::OriginHash(origin),
start: SeqNum(100),
end: SeqNum(500),
}))
.unwrap();
assert_eq!(plan.root.target_nodes, vec![2, 3]);
}
#[test]
fn between_surfaces_historical_range_unavailable_with_hints() {
let origin = 0xDEAD_DEAD_DEAD_DEAD_u64;
let index = index_with(vec![
(1, caps_with_causal_range(origin, 0, 100)),
(2, caps_with_causal_tip(origin, 50)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let err = planner
.plan(&MeshQuery::V1(QueryV1::Between {
origin: ChainRef::OriginHash(origin),
start: SeqNum(0),
end: SeqNum(500),
}))
.expect_err("no holder covers [0, 500)");
match err {
MeshError::HistoricalRangeUnavailable {
origin: o,
requested,
available,
} => {
assert_eq!(o, origin);
assert_eq!(requested, SeqNum(0)..SeqNum(500));
assert_eq!(
available,
vec![SeqNum(0)..SeqNum(100), SeqNum(0)..SeqNum(51)]
);
}
other => panic!("expected HistoricalRangeUnavailable; got {other:?}"),
}
}
#[test]
fn at_surfaces_historical_range_unavailable_when_no_coverage() {
let origin = 0xBABE_BABE_BABE_BABE_u64;
let index = index_with(vec![(1, caps_with_causal_tip(origin, 50))]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let err = planner
.plan(&MeshQuery::V1(QueryV1::At {
origin: ChainRef::OriginHash(origin),
seq: SeqNum(100),
}))
.expect_err("seq beyond tip");
match err {
MeshError::HistoricalRangeUnavailable {
requested,
available,
..
} => {
assert_eq!(requested, SeqNum(100)..SeqNum(101));
assert_eq!(available, vec![SeqNum(0)..SeqNum(51)]);
}
other => panic!("expected HistoricalRangeUnavailable; got {other:?}"),
}
}
#[test]
fn presence_form_holder_is_permissive_fallback() {
let origin = 0xFADE_FADE_FADE_FADE_u64;
let index = index_with(vec![(1, caps_with_causal_presence(origin))]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::At {
origin: ChainRef::OriginHash(origin),
seq: SeqNum(999_999),
}))
.unwrap();
assert_eq!(plan.root.target_nodes, vec![1]);
}
#[test]
fn latest_prefers_holder_with_highest_tip() {
let origin = 0xCAFE_CAFE_CAFE_CAFE_u64;
let index = index_with(vec![
(1, caps_with_causal_tip(origin, 50)),
(2, caps_with_causal_tip(origin, 500)),
(3, caps_with_causal_tip(origin, 200)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::OriginHash(origin),
}))
.unwrap();
assert_eq!(plan.root.target_nodes, vec![2, 3, 1]);
}
#[test]
fn proximity_ordering_breaks_lex_default() {
let origin = 0x3030_3030_3030_3030_u64;
let caps = caps_with_causal_presence(origin);
let index = index_with(vec![(100, caps.clone()), (50, caps.clone()), (200, caps)]);
let rtt = |nid: u64| {
Some(match nid {
50 => Duration::from_millis(10),
200 => Duration::from_millis(20),
100 => Duration::from_millis(30),
_ => return None::<Duration>,
})
};
let planner = MeshQueryPlanner::new(&index, rtt);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::OriginHash(origin),
}))
.unwrap();
assert_eq!(plan.root.target_nodes, vec![50, 200, 100]);
}
#[test]
fn unmeasured_rtt_falls_last_lex_among_themselves() {
let origin = 0x7070_7070_7070_7070_u64;
let caps = caps_with_causal_presence(origin);
let index = index_with(vec![
(1, caps.clone()),
(2, caps.clone()),
(3, caps.clone()),
(4, caps),
]);
let rtt = |nid: u64| match nid {
2 => Some(Duration::from_millis(5)),
3 => Some(Duration::from_millis(15)),
_ => None,
};
let planner = MeshQueryPlanner::new(&index, rtt);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::OriginHash(origin),
}))
.unwrap();
assert_eq!(plan.root.target_nodes, vec![2, 3, 1, 4]);
}
#[test]
fn coverage_picks_most_specific_claim_when_holder_advertises_multiple() {
let origin = 0x6060_6060_6060_6060_u64;
let hex = chain_hex(origin);
let mut caps = CapabilitySet::new();
caps.tags.insert(causal_tag(hex.clone())); caps.tags.insert(causal_tag(format!("{hex}:100"))); let index = index_with(vec![(7, caps)]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let err = planner
.plan(&MeshQuery::V1(QueryV1::At {
origin: ChainRef::OriginHash(origin),
seq: SeqNum(150),
}))
.expect_err("most-specific claim (tip) should not cover seq 150");
assert!(matches!(err, MeshError::HistoricalRangeUnavailable { .. }));
}
#[test]
fn plan_chainref_discovered_resolves_via_filter() {
use crate::adapter::net::behavior::predicate::Predicate;
use crate::adapter::net::behavior::tag::{TagKey, TaxonomyAxis};
let origin = 0xCAFE_BEEF_CAFE_BEEF_u64;
let hex = chain_hex(origin);
let mut caps = CapabilitySet::new().add_tag("dataforts.blob.storage");
caps.tags.insert(causal_tag(hex));
let index = index_with(vec![(42, caps)]);
let pred = Predicate::Exists {
key: TagKey {
axis: TaxonomyAxis::Dataforts,
key: "blob.storage".to_string(),
},
};
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::Discovered(pred.to_wire()),
}))
.expect("Discovered resolution should succeed");
match plan.root.operator {
OperatorPlan::LatestRead { origin: o } => assert_eq!(o, origin),
other => panic!("expected LatestRead; got {other:?}"),
}
assert_eq!(plan.root.target_nodes, vec![42]);
}
#[test]
fn plan_chainref_discovered_no_match_returns_no_capable_holder() {
use crate::adapter::net::behavior::predicate::Predicate;
use crate::adapter::net::behavior::tag::{TagKey, TaxonomyAxis};
let pred = Predicate::Exists {
key: TagKey {
axis: TaxonomyAxis::Dataforts,
key: "blob.storage".to_string(),
},
};
let index = empty_index();
let planner = MeshQueryPlanner::new(&index, rtt_none);
let err = planner
.plan(&MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::Discovered(pred.to_wire()),
}))
.expect_err("Discovered against empty index must surface NoCapableHolder");
match err {
MeshError::NoCapableHolder { requirement, .. } => {
assert!(requirement.contains("Exists"));
}
other => panic!("expected NoCapableHolder; got {other:?}"),
}
}
#[test]
fn plan_chainref_discovered_multiple_origins_surfaces_ambiguous_error() {
use crate::adapter::net::behavior::predicate::Predicate;
use crate::adapter::net::behavior::tag::{TagKey, TaxonomyAxis};
let origin_a = 0x0000_AAAA_AAAA_AAAA_u64;
let origin_b = 0x0000_BBBB_BBBB_BBBB_u64;
let mut caps_a = CapabilitySet::new().add_tag("dataforts.blob.storage");
caps_a.tags.insert(causal_tag(chain_hex(origin_a)));
let mut caps_b = CapabilitySet::new().add_tag("dataforts.blob.storage");
caps_b.tags.insert(causal_tag(chain_hex(origin_b)));
let index = index_with(vec![(1, caps_a), (2, caps_b)]);
let pred = Predicate::Exists {
key: TagKey {
axis: TaxonomyAxis::Dataforts,
key: "blob.storage".to_string(),
},
};
let planner = MeshQueryPlanner::new(&index, rtt_none);
let err = planner
.plan(&MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::Discovered(pred.to_wire()),
}))
.expect_err("two-origin discovery should surface AmbiguousDiscovery");
match err {
MeshError::AmbiguousDiscovery { matches, .. } => {
assert_eq!(matches.len(), 2);
assert!(matches.contains(&origin_a));
assert!(matches.contains(&origin_b));
}
other => panic!("expected AmbiguousDiscovery; got {other:?}"),
}
}
#[test]
fn plan_chainref_discovered_match_with_no_causal_tag_surfaces_no_capable_holder() {
use crate::adapter::net::behavior::predicate::Predicate;
use crate::adapter::net::behavior::tag::{TagKey, TaxonomyAxis};
let caps = CapabilitySet::new().add_tag("dataforts.blob.storage");
let index = index_with(vec![(7, caps)]);
let pred = Predicate::Exists {
key: TagKey {
axis: TaxonomyAxis::Dataforts,
key: "blob.storage".to_string(),
},
};
let planner = MeshQueryPlanner::new(&index, rtt_none);
let err = planner
.plan(&MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::Discovered(pred.to_wire()),
}))
.expect_err("missing causal: tag should surface NoCapableHolder");
assert!(matches!(err, MeshError::NoCapableHolder { .. }));
}
#[test]
fn plan_composite_operator_surfaces_not_yet_implemented() {
let origin = 0x9999_9999_9999_9999_u64;
let index = make_index_with_holder(1, origin);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let q = MeshQuery::V1(QueryV1::Project {
inner: Box::new(MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::OriginHash(origin),
})),
columns: vec![Expr::Field("origin".to_string())],
});
let plan = planner.plan(&q).unwrap();
match plan.root.operator {
OperatorPlan::NotYetImplemented { detail, input } => {
assert!(detail.contains("Project"));
assert!(input.is_some(), "Project's inner sub-plan must be carried");
}
other => panic!("expected NotYetImplemented; got {other:?}"),
}
}
#[test]
fn plan_is_deterministic() {
let origin = 0x5555_5555_5555_5555_u64;
let index = make_index_with_holder(11, origin);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let q = MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::OriginHash(origin),
});
let p1 = planner.plan(&q).unwrap();
let p2 = planner.plan(&q).unwrap();
let e1 = postcard::to_allocvec(&p1).unwrap();
let e2 = postcard::to_allocvec(&p2).unwrap();
assert_eq!(e1, e2, "plan must be deterministic byte-by-byte");
}
#[test]
fn lineage_back_with_multiple_fork_of_tags_is_deterministic() {
let child = 0x0000_0000_0000_00CC_u64;
let parent_a = 0x0000_0000_0000_0001_u64;
let parent_b = 0x0000_0000_0000_0002_u64;
let parent_c = 0x0000_0000_0000_0003_u64;
let mut caps = CapabilitySet::new();
caps.tags.insert(causal_tag(chain_hex(child)));
caps.tags.insert(fork_tag(parent_a));
caps.tags.insert(fork_tag(parent_b));
caps.tags.insert(fork_tag(parent_c));
let index = index_with(vec![
(1, caps),
(2, caps_chain_only(parent_a)),
(3, caps_chain_only(parent_b)),
(4, caps_chain_only(parent_c)),
]);
let mut last_encoding: Option<Vec<u8>> = None;
for _ in 0..32 {
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::LineageBack {
origin: ChainRef::OriginHash(child),
max_depth: 1,
}))
.unwrap();
let bytes = postcard::to_allocvec(&plan).unwrap();
if let Some(prev) = &last_encoding {
assert_eq!(
prev, &bytes,
"BFS plan must not depend on HashSet iter order"
);
}
last_encoding = Some(bytes);
}
}
#[test]
fn lineage_back_across_multiple_replica_hosts_is_deterministic() {
let child = 0x0000_0000_0000_DDDD_u64;
let parent_a = 0x0000_0000_0000_0001_u64;
let parent_b = 0x0000_0000_0000_0002_u64;
let index = index_with(vec![
(1, caps_chain_forked_from(child, parent_a)),
(2, caps_chain_forked_from(child, parent_b)),
(3, caps_chain_only(parent_a)),
(4, caps_chain_only(parent_b)),
]);
let mut last_encoding: Option<Vec<u8>> = None;
for _ in 0..32 {
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::LineageBack {
origin: ChainRef::OriginHash(child),
max_depth: 1,
}))
.unwrap();
let bytes = postcard::to_allocvec(&plan).unwrap();
if let Some(prev) = &last_encoding {
assert_eq!(
prev, &bytes,
"parent_of must not depend on DashMap iter order across replicas",
);
}
last_encoding = Some(bytes);
}
}
#[test]
fn execution_plan_round_trips_through_postcard() {
let origin = 0x1111_1111_1111_1111_u64;
let index = make_index_with_holder(3, origin);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let p = planner
.plan(&MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::OriginHash(origin),
}))
.unwrap();
let bytes = postcard::to_allocvec(&p).unwrap();
let back: ExecutionPlan = postcard::from_bytes(&bytes).unwrap();
assert_eq!(back, p);
}
#[test]
fn cost_estimate_propagates_rtt() {
let origin = 0x2222_2222_2222_2222_u64;
let index = make_index_with_holder(5, origin);
let rtt = |nid: u64| {
if nid == 5 {
Some(Duration::from_millis(15))
} else {
None
}
};
let planner = MeshQueryPlanner::new(&index, rtt);
let p = planner
.plan(&MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::OriginHash(origin),
}))
.unwrap();
assert_eq!(p.root.cost.latency_ms, 15);
assert_eq!(p.root.cost.bandwidth_bytes, PHASE_A_ATOMIC_BANDWIDTH_BYTES);
}
fn fork_tag(parent_hash: u64) -> Tag {
Tag::Reserved {
prefix: "fork-of:".to_string(),
body: chain_hex(parent_hash),
}
}
fn caps_chain_forked_from(chain: u64, parent: u64) -> CapabilitySet {
let mut caps = CapabilitySet::new();
caps.tags.insert(causal_tag(chain_hex(chain)));
caps.tags.insert(fork_tag(parent));
caps
}
fn caps_chain_tip_forked_from(chain: u64, tip: u64, parent: u64) -> CapabilitySet {
let mut caps = CapabilitySet::new();
caps.tags
.insert(causal_tag(format!("{}:{}", chain_hex(chain), tip)));
caps.tags.insert(fork_tag(parent));
caps
}
fn caps_chain_only(chain: u64) -> CapabilitySet {
let mut caps = CapabilitySet::new();
caps.tags.insert(causal_tag(chain_hex(chain)));
caps
}
#[test]
fn parse_fork_body_round_trips_16_hex() {
assert_eq!(parse_fork_body("00000000deadbeef"), Some(0xDEAD_BEEF));
assert_eq!(
parse_fork_body(&chain_hex(0x1234_5678_9ABC_DEF0)),
Some(0x1234_5678_9ABC_DEF0)
);
}
#[test]
fn parse_fork_body_rejects_short_or_non_hex() {
assert!(parse_fork_body("deadbeef").is_none()); assert!(parse_fork_body("deadbeefcafebabe0").is_none()); assert!(parse_fork_body("zzzzzzzzzzzzzzzz").is_none()); }
#[test]
fn lineage_back_single_root_returns_only_start() {
let root = 0x0000_0000_0000_0001_u64;
let index = index_with(vec![(1, caps_chain_only(root))]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::LineageBack {
origin: ChainRef::OriginHash(root),
max_depth: 5,
}))
.unwrap();
match plan.root.operator {
OperatorPlan::LineageEmit {
origin,
direction,
entries,
} => {
assert_eq!(origin, root);
assert_eq!(direction, LineageDirection::Back);
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].origin, root);
assert_eq!(entries[0].depth, 0);
assert_eq!(entries[0].tip_seq, None);
}
other => panic!("expected LineageEmit; got {other:?}"),
}
}
#[test]
fn lineage_back_walks_a_long_linear_chain_without_stack_overflow() {
const N: u64 = 500;
let mut holders = Vec::with_capacity(N as usize);
holders.push((1, caps_chain_only(1)));
for i in 1..N {
holders.push((i + 1, caps_chain_forked_from(i + 1, i)));
}
let index = index_with(holders);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::LineageBack {
origin: ChainRef::OriginHash(N),
max_depth: (N + 10) as u32,
}))
.expect("10k-deep walk plans cleanly");
match plan.root.operator {
OperatorPlan::LineageEmit { entries, .. } => {
assert_eq!(entries.len(), N as usize);
assert_eq!(entries[0].origin, N);
assert_eq!(entries[0].depth, 0);
assert_eq!(entries[(N - 1) as usize].origin, 1);
assert_eq!(entries[(N - 1) as usize].depth, (N - 1) as u32);
}
other => panic!("expected LineageEmit; got {other:?}"),
}
}
#[test]
fn lineage_forward_walks_a_wide_fanout_without_stack_overflow() {
const N: u64 = 1_000;
let root = 1;
let mut holders = Vec::with_capacity((N + 1) as usize);
holders.push((1, caps_chain_only(root)));
for i in 0..N {
let child = 2 + i;
holders.push((100 + i, caps_chain_forked_from(child, root)));
}
let index = index_with(holders);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::LineageForward {
origin: ChainRef::OriginHash(root),
max_depth: 5,
}))
.expect("1k-wide fan-out plans cleanly");
match plan.root.operator {
OperatorPlan::LineageEmit { entries, .. } => {
assert_eq!(entries.len(), (N + 1) as usize);
assert_eq!(entries[0].origin, root);
assert_eq!(entries[0].depth, 0);
assert!(entries[1..].iter().all(|e| e.depth == 1));
}
other => panic!("expected LineageEmit; got {other:?}"),
}
}
#[test]
fn lineage_back_walks_through_three_generations() {
let g = 0x0000_0000_0000_00AA_u64;
let p = 0x0000_0000_0000_00BB_u64;
let c = 0x0000_0000_0000_00CC_u64;
let index = index_with(vec![
(10, caps_chain_only(g)),
(20, caps_chain_forked_from(p, g)),
(30, caps_chain_forked_from(c, p)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::LineageBack {
origin: ChainRef::OriginHash(c),
max_depth: 5,
}))
.unwrap();
match plan.root.operator {
OperatorPlan::LineageEmit { entries, .. } => {
let chain: Vec<u64> = entries.iter().map(|e| e.origin).collect();
assert_eq!(chain, vec![c, p, g]);
let depths: Vec<u32> = entries.iter().map(|e| e.depth).collect();
assert_eq!(depths, vec![0, 1, 2]);
}
other => panic!("expected LineageEmit; got {other:?}"),
}
}
#[test]
fn lineage_back_propagates_tip_seq_from_holders() {
let parent = 0xAA;
let child = 0xBB;
let index = index_with(vec![
(1, caps_chain_tip_forked_from(parent, 99, 0)), (2, caps_chain_tip_forked_from(child, 42, parent)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::LineageBack {
origin: ChainRef::OriginHash(child),
max_depth: 2,
}))
.unwrap();
match plan.root.operator {
OperatorPlan::LineageEmit { entries, .. } => {
assert_eq!(entries[0].origin, child);
assert_eq!(entries[0].tip_seq, Some(SeqNum(42)));
assert_eq!(entries[1].origin, parent);
assert_eq!(entries[1].tip_seq, Some(SeqNum(99)));
}
other => panic!("expected LineageEmit; got {other:?}"),
}
}
#[test]
fn lineage_back_detects_cycle() {
let a = 0x000A;
let b = 0x000B;
let index = index_with(vec![
(1, caps_chain_forked_from(a, b)),
(2, caps_chain_forked_from(b, a)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let err = planner
.plan(&MeshQuery::V1(QueryV1::LineageBack {
origin: ChainRef::OriginHash(a),
max_depth: 10,
}))
.unwrap_err();
match err {
MeshError::LineageCycleDetected { origin, cycle } => {
assert_eq!(origin, a);
assert!(cycle.contains(&a), "cycle missing a: {cycle:?}");
assert!(cycle.contains(&b), "cycle missing b: {cycle:?}");
}
other => panic!("expected LineageCycleDetected; got {other:?}"),
}
}
#[test]
fn lineage_back_surfaces_max_depth_exceeded_when_walk_could_continue() {
let g0 = 0x10;
let g1 = 0x11;
let g2 = 0x12;
let g3 = 0x13;
let index = index_with(vec![
(1, caps_chain_only(g0)),
(2, caps_chain_forked_from(g1, g0)),
(3, caps_chain_forked_from(g2, g1)),
(4, caps_chain_forked_from(g3, g2)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let err = planner
.plan(&MeshQuery::V1(QueryV1::LineageBack {
origin: ChainRef::OriginHash(g3),
max_depth: 2,
}))
.unwrap_err();
match err {
MeshError::LineageMaxDepthExceeded { origin, depth } => {
assert_eq!(origin, g3);
assert_eq!(depth, 2);
}
other => panic!("expected LineageMaxDepthExceeded; got {other:?}"),
}
}
#[test]
fn lineage_back_terminates_exactly_at_max_depth_without_error() {
let g0 = 0x20;
let g1 = 0x21;
let g2 = 0x22;
let index = index_with(vec![
(1, caps_chain_only(g0)),
(2, caps_chain_forked_from(g1, g0)),
(3, caps_chain_forked_from(g2, g1)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::LineageBack {
origin: ChainRef::OriginHash(g2),
max_depth: 2,
}))
.unwrap();
if let OperatorPlan::LineageEmit { entries, .. } = plan.root.operator {
assert_eq!(
entries.iter().map(|e| e.origin).collect::<Vec<_>>(),
vec![g2, g1, g0]
);
} else {
panic!("expected LineageEmit");
}
}
#[test]
fn lineage_forward_emits_descendants_bfs_sorted() {
let root = 0x100;
let c1 = 0x110;
let c2 = 0x120;
let gc = 0x130;
let index = index_with(vec![
(1, caps_chain_only(root)),
(2, caps_chain_forked_from(c1, root)),
(3, caps_chain_forked_from(c2, root)),
(4, caps_chain_forked_from(gc, c1)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::LineageForward {
origin: ChainRef::OriginHash(root),
max_depth: 5,
}))
.unwrap();
match plan.root.operator {
OperatorPlan::LineageEmit {
direction, entries, ..
} => {
assert_eq!(direction, LineageDirection::Forward);
let chain: Vec<u64> = entries.iter().map(|e| e.origin).collect();
assert_eq!(chain, vec![root, c1, c2, gc]);
let depths: Vec<u32> = entries.iter().map(|e| e.depth).collect();
assert_eq!(depths, vec![0, 1, 1, 2]);
}
other => panic!("expected LineageEmit; got {other:?}"),
}
}
#[test]
fn lineage_forward_surfaces_max_depth_when_descendants_remain() {
let root = 0x200;
let c1 = 0x210;
let gc = 0x220;
let index = index_with(vec![
(1, caps_chain_only(root)),
(2, caps_chain_forked_from(c1, root)),
(3, caps_chain_forked_from(gc, c1)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let err = planner
.plan(&MeshQuery::V1(QueryV1::LineageForward {
origin: ChainRef::OriginHash(root),
max_depth: 1,
}))
.unwrap_err();
match err {
MeshError::LineageMaxDepthExceeded { origin, depth } => {
assert_eq!(origin, root);
assert_eq!(depth, 1);
}
other => panic!("expected LineageMaxDepthExceeded; got {other:?}"),
}
}
#[test]
fn lineage_forward_with_no_descendants_returns_only_start() {
let leaf = 0x300;
let index = index_with(vec![(1, caps_chain_only(leaf))]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::LineageForward {
origin: ChainRef::OriginHash(leaf),
max_depth: 10,
}))
.unwrap();
if let OperatorPlan::LineageEmit { entries, .. } = plan.root.operator {
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].origin, leaf);
} else {
panic!("expected LineageEmit");
}
}
#[test]
fn lineage_back_with_max_depth_zero_returns_only_start_no_error() {
let g0 = 0x40;
let g1 = 0x41;
let index = index_with(vec![
(1, caps_chain_only(g0)),
(2, caps_chain_forked_from(g1, g0)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::LineageBack {
origin: ChainRef::OriginHash(g1),
max_depth: 0,
}))
.expect("max_depth=0 must succeed even when a parent exists");
match plan.root.operator {
OperatorPlan::LineageEmit { entries, .. } => {
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].origin, g1);
assert_eq!(entries[0].depth, 0);
}
other => panic!("expected LineageEmit; got {other:?}"),
}
}
#[test]
fn lineage_forward_with_max_depth_zero_returns_only_start_no_error() {
let parent = 0x50;
let child = 0x51;
let index = index_with(vec![
(1, caps_chain_only(parent)),
(2, caps_chain_forked_from(child, parent)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::LineageForward {
origin: ChainRef::OriginHash(parent),
max_depth: 0,
}))
.expect("max_depth=0 must succeed even when descendants exist");
match plan.root.operator {
OperatorPlan::LineageEmit { entries, .. } => {
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].origin, parent);
assert_eq!(entries[0].depth, 0);
}
other => panic!("expected LineageEmit; got {other:?}"),
}
}
#[test]
fn lineage_emit_round_trips_through_postcard() {
let parent = 0xAA;
let child = 0xBB;
let index = index_with(vec![
(1, caps_chain_only(parent)),
(2, caps_chain_forked_from(child, parent)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&MeshQuery::V1(QueryV1::LineageBack {
origin: ChainRef::OriginHash(child),
max_depth: 5,
}))
.unwrap();
let bytes = postcard::to_allocvec(&plan).unwrap();
let decoded: ExecutionPlan = postcard::from_bytes(&bytes).unwrap();
assert_eq!(decoded, plan);
}
fn join_query(
left_field: &str,
right_field: &str,
left_chain: u64,
right_chain: u64,
) -> MeshQuery {
MeshQuery::V1(QueryV1::Join {
left: Box::new(MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::OriginHash(left_chain),
})),
right: Box::new(MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::OriginHash(right_chain),
})),
on: JoinKey {
left_field: Expr::Field(left_field.to_string()),
right_field: Expr::Field(right_field.to_string()),
},
kind: JoinKind::Inner,
watermark: Duration::from_secs(5),
})
}
#[test]
fn plan_join_on_origin_produces_hash_join_with_origin_key() {
let l = 0x1111;
let r = 0x2222;
let index = index_with(vec![
(1, caps_with_causal_presence(l)),
(2, caps_with_causal_presence(r)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner.plan(&join_query("origin", "origin", l, r)).unwrap();
match plan.root.operator {
OperatorPlan::HashJoin {
key_mode,
kind,
strategy,
watermark,
left,
right,
} => {
assert_eq!(key_mode, JoinKeyMode::Origin);
assert_eq!(kind, JoinKind::Inner);
assert_eq!(strategy, JoinStrategy::HashBroadcast);
assert_eq!(watermark, Duration::from_secs(5));
assert!(matches!(left.operator, OperatorPlan::LatestRead { .. }));
assert!(matches!(right.operator, OperatorPlan::LatestRead { .. }));
}
other => panic!("expected HashJoin; got {other:?}"),
}
}
#[test]
fn plan_join_on_seq_produces_seq_key_mode() {
let l = 0x3333;
let r = 0x4444;
let index = index_with(vec![
(1, caps_with_causal_presence(l)),
(2, caps_with_causal_presence(r)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner.plan(&join_query("seq", "seq", l, r)).unwrap();
if let OperatorPlan::HashJoin { key_mode, .. } = plan.root.operator {
assert_eq!(key_mode, JoinKeyMode::Seq);
} else {
panic!("expected HashJoin");
}
}
#[test]
fn plan_join_with_mismatched_field_names_surfaces_planner_error() {
let l = 0x5555;
let r = 0x6666;
let index = index_with(vec![
(1, caps_with_causal_presence(l)),
(2, caps_with_causal_presence(r)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let err = planner
.plan(&join_query("origin", "seq", l, r))
.unwrap_err();
match err {
MeshError::PlannerError { detail } => {
assert!(detail.contains("same field name"), "got: {detail}");
}
other => panic!("expected PlannerError; got {other:?}"),
}
}
#[test]
fn plan_join_on_payload_field_produces_field_key_mode() {
let l = 0x7777;
let r = 0x8888;
let index = index_with(vec![
(1, caps_with_causal_presence(l)),
(2, caps_with_causal_presence(r)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&join_query(
"payload.request_id",
"payload.request_id",
l,
r,
))
.unwrap();
match plan.root.operator {
OperatorPlan::HashJoin { key_mode, .. } => {
assert_eq!(
key_mode,
JoinKeyMode::Field("payload.request_id".to_string())
);
}
other => panic!("expected HashJoin; got {other:?}"),
}
}
#[test]
fn plan_join_with_non_field_expression_surfaces_planner_error() {
let l = 0x9999;
let r = 0xAAAA;
let index = index_with(vec![
(1, caps_with_causal_presence(l)),
(2, caps_with_causal_presence(r)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let q = MeshQuery::V1(QueryV1::Join {
left: Box::new(MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::OriginHash(l),
})),
right: Box::new(MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::OriginHash(r),
})),
on: JoinKey {
left_field: Expr::LitString("origin".to_string()),
right_field: Expr::Field("origin".to_string()),
},
kind: JoinKind::Inner,
watermark: Duration::from_secs(5),
});
let err = planner.plan(&q).unwrap_err();
match err {
MeshError::PlannerError { detail } => {
assert!(detail.contains("field reference"));
}
other => panic!("expected PlannerError; got {other:?}"),
}
}
#[test]
fn plan_join_round_trips_through_postcard() {
let l = 0xCCCC;
let r = 0xDDDD;
let index = index_with(vec![
(1, caps_with_causal_presence(l)),
(2, caps_with_causal_presence(r)),
]);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner.plan(&join_query("origin", "origin", l, r)).unwrap();
let bytes = postcard::to_allocvec(&plan).unwrap();
let decoded: ExecutionPlan = postcard::from_bytes(&bytes).unwrap();
assert_eq!(decoded, plan);
}
fn aggregate_query(origin: u64, group_by: Vec<Expr>, agg_fn: AggregateFn) -> MeshQuery {
MeshQuery::V1(QueryV1::Aggregate {
inner: Box::new(MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::OriginHash(origin),
})),
group_by,
agg_fn,
})
}
#[test]
fn plan_aggregate_count_no_group_by() {
let origin = 0x1111;
let index = make_index_with_holder(1, origin);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&aggregate_query(origin, vec![], AggregateFn::Count))
.unwrap();
match plan.root.operator {
OperatorPlan::AggregateCount { input, group_by } => {
assert_eq!(group_by, None);
assert!(matches!(input.operator, OperatorPlan::LatestRead { .. }));
}
other => panic!("expected AggregateCount; got {other:?}"),
}
}
#[test]
fn plan_aggregate_count_group_by_origin() {
let origin = 0x2222;
let index = make_index_with_holder(1, origin);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&aggregate_query(
origin,
vec![Expr::Field("origin".to_string())],
AggregateFn::Count,
))
.unwrap();
if let OperatorPlan::AggregateCount { group_by, .. } = plan.root.operator {
assert_eq!(group_by, Some(JoinKeyMode::Origin));
} else {
panic!("expected AggregateCount");
}
}
#[test]
fn plan_aggregate_count_group_by_seq() {
let origin = 0x3333;
let index = make_index_with_holder(1, origin);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&aggregate_query(
origin,
vec![Expr::Field("seq".to_string())],
AggregateFn::Count,
))
.unwrap();
if let OperatorPlan::AggregateCount { group_by, .. } = plan.root.operator {
assert_eq!(group_by, Some(JoinKeyMode::Seq));
} else {
panic!("expected AggregateCount");
}
}
#[test]
fn plan_aggregate_count_group_by_origin_seq_composite() {
let origin = 0x4444;
let index = make_index_with_holder(1, origin);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&aggregate_query(
origin,
vec![
Expr::Field("origin".to_string()),
Expr::Field("seq".to_string()),
],
AggregateFn::Count,
))
.unwrap();
if let OperatorPlan::AggregateCount { group_by, .. } = plan.root.operator {
assert_eq!(group_by, Some(JoinKeyMode::OriginSeq));
} else {
panic!("expected AggregateCount");
}
}
#[test]
fn plan_aggregate_sum_produces_aggregate_numeric() {
let origin = 0x5555;
let index = make_index_with_holder(1, origin);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&aggregate_query(
origin,
vec![],
AggregateFn::Sum {
field: Expr::Field("amount".to_string()),
},
))
.unwrap();
match plan.root.operator {
OperatorPlan::AggregateNumeric {
group_by,
field_path,
kind,
..
} => {
assert_eq!(group_by, None);
assert_eq!(field_path, "amount");
assert_eq!(kind, super::super::query::NumericAggregateKind::Sum);
}
other => panic!("expected AggregateNumeric; got {other:?}"),
}
}
#[test]
fn plan_aggregate_avg_with_group_by_produces_aggregate_numeric() {
let origin = 0x5556;
let index = make_index_with_holder(1, origin);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&aggregate_query(
origin,
vec![Expr::Field("origin".to_string())],
AggregateFn::Avg {
field: Expr::Field("latency".to_string()),
},
))
.unwrap();
if let OperatorPlan::AggregateNumeric {
group_by,
field_path,
kind,
..
} = plan.root.operator
{
assert_eq!(group_by, Some(JoinKeyMode::Origin));
assert_eq!(field_path, "latency");
assert_eq!(kind, super::super::query::NumericAggregateKind::Avg);
} else {
panic!("expected AggregateNumeric");
}
}
#[test]
fn plan_aggregate_non_field_arg_surfaces_planner_error() {
let origin = 0x5557;
let index = make_index_with_holder(1, origin);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let err = planner
.plan(&aggregate_query(
origin,
vec![],
AggregateFn::Sum {
field: Expr::LitInt(42),
},
))
.unwrap_err();
match err {
MeshError::PlannerError { detail } => {
assert!(detail.contains("Expr::Field"), "got: {detail}");
}
other => panic!("expected PlannerError; got {other:?}"),
}
}
#[test]
fn plan_aggregate_sketch_function_still_surfaces_planner_error() {
let origin = 0x5558;
let index = make_index_with_holder(1, origin);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let err = planner
.plan(&aggregate_query(
origin,
vec![],
AggregateFn::DistinctCountHll {
field: Expr::Field("user_id".to_string()),
},
))
.unwrap_err();
match err {
MeshError::PlannerError { detail } => {
assert!(detail.contains("sketch implementation"), "got: {detail}");
assert!(detail.contains("DistinctCountExact"), "got: {detail}");
}
other => panic!("expected PlannerError; got {other:?}"),
}
}
#[test]
fn plan_aggregate_group_by_payload_field_surfaces_planner_error() {
let origin = 0x6666;
let index = make_index_with_holder(1, origin);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let err = planner
.plan(&aggregate_query(
origin,
vec![Expr::Field("payload.severity".to_string())],
AggregateFn::Count,
))
.unwrap_err();
match err {
MeshError::PlannerError { detail } => {
assert!(detail.contains("row-intrinsic"));
}
other => panic!("expected PlannerError; got {other:?}"),
}
}
#[test]
fn plan_aggregate_round_trips_through_postcard() {
let origin = 0x7777;
let index = make_index_with_holder(1, origin);
let planner = MeshQueryPlanner::new(&index, rtt_none);
let plan = planner
.plan(&aggregate_query(
origin,
vec![Expr::Field("origin".to_string())],
AggregateFn::Count,
))
.unwrap();
let bytes = postcard::to_allocvec(&plan).unwrap();
let decoded: ExecutionPlan = postcard::from_bytes(&bytes).unwrap();
assert_eq!(decoded, plan);
}
}