use std::time::Duration;
use serde::{Deserialize, Serialize};
use crate::adapter::net::behavior::predicate::PredicateWire;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum MeshQuery {
V1(QueryV1),
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum QueryV1 {
At {
origin: ChainRef,
seq: SeqNum,
},
Between {
origin: ChainRef,
start: SeqNum,
end: SeqNum,
},
Latest {
origin: ChainRef,
},
LineageBack {
origin: ChainRef,
max_depth: u32,
},
LineageForward {
origin: ChainRef,
max_depth: u32,
},
Join {
left: Box<MeshQuery>,
right: Box<MeshQuery>,
on: JoinKey,
kind: JoinKind,
watermark: Duration,
},
Filter {
inner: Box<MeshQuery>,
predicate: PredicateWire,
},
Aggregate {
inner: Box<MeshQuery>,
group_by: Vec<Expr>,
agg_fn: AggregateFn,
},
Project {
inner: Box<MeshQuery>,
columns: Vec<Expr>,
},
OrderBy {
inner: Box<MeshQuery>,
by: Vec<OrderKey>,
limit: Option<u64>,
},
Window {
inner: Box<MeshQuery>,
spec: WindowSpec,
},
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum WindowSpec {
TumblingSeq {
size: u64,
},
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct WindowBoundary {
pub start: SeqNum,
pub end: SeqNum,
pub rows: Vec<ResultRow>,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum ChainRef {
OriginHash(u64),
Discovered(PredicateWire),
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct SeqNum(pub u64);
impl From<u64> for SeqNum {
fn from(value: u64) -> Self {
Self(value)
}
}
impl From<SeqNum> for u64 {
fn from(value: SeqNum) -> Self {
value.0
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum JoinKind {
Inner,
LeftOuter,
RightOuter,
FullOuter,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct JoinKey {
pub left_field: Expr,
pub right_field: Expr,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum AggregateFn {
Count,
Sum {
field: Expr,
},
Avg {
field: Expr,
},
Min {
field: Expr,
},
Max {
field: Expr,
},
DistinctCountHll {
field: Expr,
},
DistinctCountExact {
field: Expr,
},
PercentileTDigest {
field: Expr,
p: f64,
},
PercentileExact {
field: Expr,
p: f64,
},
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum Expr {
Field(String),
LitString(String),
LitInt(i64),
LitFloat(f64),
LitBool(bool),
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum OrderDir {
Asc,
Desc,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct OrderKey {
pub field: Expr,
pub dir: OrderDir,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct ResultRow {
pub origin: u64,
pub seq: SeqNum,
pub payload: Vec<u8>,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum GroupKey {
Origin(u64),
Seq(SeqNum),
OriginSeq {
origin: u64,
seq: SeqNum,
},
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct AggregateRowPayload {
pub group: Option<GroupKey>,
pub value: AggregateValue,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum AggregateValue {
Count(u64),
Sum(f64),
Avg(Option<f64>),
Min(Option<f64>),
Max(Option<f64>),
DistinctCount(u64),
Percentile(Option<f64>),
}
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
pub enum NumericReductionKind {
Min,
Max,
Percentile {
p: f64,
},
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum NumericAggregateKind {
Sum,
Avg,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct JoinedRowPayload {
pub left: Option<ResultRow>,
pub right: Option<ResultRow>,
}
pub const DEFAULT_JOIN_WATERMARK_SECS: u64 = 5;
pub fn clamp_join_watermark_secs(secs: Option<f64>) -> Duration {
match secs {
Some(s) if s.is_finite() && s >= 0.0 => Duration::from_secs_f64(s),
_ => Duration::from_secs(DEFAULT_JOIN_WATERMARK_SECS),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn small_query() -> MeshQuery {
MeshQuery::V1(QueryV1::Latest {
origin: ChainRef::OriginHash(0xABAB_ABAB_ABAB_ABAB),
})
}
fn between_query() -> MeshQuery {
MeshQuery::V1(QueryV1::Between {
origin: ChainRef::OriginHash(0x4242_4242_4242_4242),
start: SeqNum(0),
end: SeqNum(12345),
})
}
fn complex_query_postcardable() -> MeshQuery {
MeshQuery::V1(QueryV1::Aggregate {
inner: Box::new(between_query()),
group_by: vec![Expr::Field("operator_id".to_string())],
agg_fn: AggregateFn::Count,
})
}
fn complex_query_with_filter() -> MeshQuery {
use crate::adapter::net::behavior::predicate::Predicate;
use crate::adapter::net::behavior::tag::{TagKey, TaxonomyAxis};
let predicate = Predicate::Exists {
key: TagKey {
axis: TaxonomyAxis::Dataforts,
key: "blob.storage".to_string(),
},
};
let filter = MeshQuery::V1(QueryV1::Filter {
inner: Box::new(between_query()),
predicate: predicate.to_wire(),
});
MeshQuery::V1(QueryV1::Aggregate {
inner: Box::new(filter),
group_by: vec![Expr::Field("operator_id".to_string())],
agg_fn: AggregateFn::Count,
})
}
#[test]
fn meshquery_round_trips_through_postcard() {
for q in [small_query(), between_query(), complex_query_postcardable()] {
let bytes = postcard::to_allocvec(&q).expect("encode");
let decoded: MeshQuery = postcard::from_bytes(&bytes).expect("decode");
assert_eq!(decoded, q);
}
}
#[test]
fn meshquery_round_trips_through_json() {
for q in [
small_query(),
between_query(),
complex_query_postcardable(),
complex_query_with_filter(),
] {
let s = serde_json::to_string(&q).expect("encode");
let decoded: MeshQuery = serde_json::from_str(&s).expect("decode");
assert_eq!(decoded, q);
}
}
#[test]
fn version_tag_visible_in_json() {
let body = serde_json::to_string(&small_query()).unwrap();
assert!(
body.contains("\"V1\""),
"JSON form must carry the V1 discriminant; got: {body}"
);
}
#[test]
fn seq_num_from_u64_round_trips() {
let s: SeqNum = 42u64.into();
let back: u64 = s.into();
assert_eq!(back, 42);
}
#[test]
fn chainref_originhash_round_trips() {
let r = ChainRef::OriginHash(0xCDCD_CDCD_CDCD_CDCD);
let bytes = postcard::to_allocvec(&r).unwrap();
let back: ChainRef = postcard::from_bytes(&bytes).unwrap();
assert_eq!(back, r);
}
#[test]
fn aggregatefn_distinct_hll_round_trips() {
let f = AggregateFn::DistinctCountHll {
field: Expr::Field("user_id".to_string()),
};
let bytes = postcard::to_allocvec(&f).unwrap();
let back: AggregateFn = postcard::from_bytes(&bytes).unwrap();
assert_eq!(back, f);
}
#[test]
fn aggregatefn_percentile_round_trips_with_p() {
let f = AggregateFn::PercentileTDigest {
field: Expr::Field("latency_ms".to_string()),
p: 0.99,
};
let bytes = postcard::to_allocvec(&f).unwrap();
let back: AggregateFn = postcard::from_bytes(&bytes).unwrap();
assert_eq!(back, f);
}
#[test]
fn join_round_trips_with_watermark() {
let q = MeshQuery::V1(QueryV1::Join {
left: Box::new(small_query()),
right: Box::new(between_query()),
on: JoinKey {
left_field: Expr::Field("request_id".to_string()),
right_field: Expr::Field("request_id".to_string()),
},
kind: JoinKind::Inner,
watermark: Duration::from_secs(5),
});
let bytes = postcard::to_allocvec(&q).unwrap();
let back: MeshQuery = postcard::from_bytes(&bytes).unwrap();
assert_eq!(back, q);
}
#[test]
fn resultrow_round_trips_through_postcard() {
let row = ResultRow {
origin: 0x7777_7777_7777_7777,
seq: SeqNum(1024),
payload: b"the bytes".to_vec(),
};
let bytes = postcard::to_allocvec(&row).unwrap();
let back: ResultRow = postcard::from_bytes(&bytes).unwrap();
assert_eq!(back, row);
}
#[test]
fn clamp_join_watermark_passes_through_finite_non_negative_seconds() {
assert_eq!(clamp_join_watermark_secs(Some(0.0)), Duration::from_secs(0));
assert_eq!(
clamp_join_watermark_secs(Some(2.5)),
Duration::from_secs_f64(2.5)
);
assert_eq!(
clamp_join_watermark_secs(Some(300.0)),
Duration::from_secs(300),
);
}
#[test]
fn clamp_join_watermark_falls_back_to_default_on_none() {
assert_eq!(
clamp_join_watermark_secs(None),
Duration::from_secs(DEFAULT_JOIN_WATERMARK_SECS),
);
}
#[test]
fn clamp_join_watermark_falls_back_to_default_on_non_finite() {
for bad in [f64::NAN, f64::INFINITY, f64::NEG_INFINITY] {
assert_eq!(
clamp_join_watermark_secs(Some(bad)),
Duration::from_secs(DEFAULT_JOIN_WATERMARK_SECS),
"bad input {bad:?} should clamp to default",
);
}
}
#[test]
fn clamp_join_watermark_falls_back_to_default_on_negative() {
for bad in [-0.001_f64, -1.0, -1e9] {
assert_eq!(
clamp_join_watermark_secs(Some(bad)),
Duration::from_secs(DEFAULT_JOIN_WATERMARK_SECS),
"negative {bad} should clamp to default",
);
}
}
}