nodedb_cluster/distributed_array/wire.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Wire types for array shard RPC messages.
4//!
5//! All request/response pairs are zerompk-serialisable. Complex sub-payloads
6//! (slice predicates, cell batches) are carried as opaque msgpack bytes —
7//! the same convention used by `ArrayOp` in the bridge physical plan — so
8//! this crate does not need a compile-time dependency on `nodedb-array`.
9
10/// Scatter request: coordinator asks a shard to execute a coord-range slice.
11///
12/// `slice_msgpack` is a zerompk encoding of `nodedb_array::query::Slice`.
13/// `attr_projection` is the attribute index list; empty means all attributes.
14#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
15pub struct ArrayShardSliceReq {
16 /// Array scoped by tenant + name (zerompk encoding of `nodedb_array::types::ArrayId`).
17 pub array_id_msgpack: Vec<u8>,
18 /// Zerompk encoding of `nodedb_array::query::Slice`.
19 pub slice_msgpack: Vec<u8>,
20 pub attr_projection: Vec<u32>,
21 pub limit: u32,
22 /// Optional surrogate pre-filter bitmap bytes (zerompk encoding of `SurrogateBitmap`).
23 /// Empty slice means no filter.
24 pub cell_filter_msgpack: Vec<u8>,
25 /// Prefix bits used for Hilbert routing (1–16). 0 means no routing validation.
26 ///
27 /// When non-zero the shard verifies that its `local_vshard_id` is covered
28 /// by the Hilbert ranges in `slice_hilbert_ranges` before executing the
29 /// scan. A mismatch returns `ClusterError::WrongOwner` so the coordinator
30 /// can retry against a refreshed routing table.
31 pub prefix_bits: u8,
32 /// Inclusive Hilbert-prefix ranges `(lo, hi)` that this slice covers,
33 /// pre-computed by the coordinator from the slice predicate. Each entry
34 /// is encoded as two consecutive `u64` values (little-endian). Empty
35 /// means unbounded (no routing validation is performed).
36 pub slice_hilbert_ranges: Vec<(u64, u64)>,
37 /// Hilbert-prefix range `[lo, hi]` that this shard owns. The shard
38 /// applies this as a pre-filter so it only returns cells whose Hilbert
39 /// prefix falls within the range. `None` means unbounded (return all
40 /// matching cells). Used by the distributed shard handler to prevent
41 /// duplicate rows in single-node harnesses where all vShards share one
42 /// Data Plane.
43 pub shard_hilbert_range: Option<(u64, u64)>,
44 /// Bitemporal system-time cutoff forwarded from `ArrayOp::Slice::system_as_of`.
45 /// `None` = live read.
46 pub system_as_of: Option<i64>,
47 /// Bitemporal valid-time point forwarded from `ArrayOp::Slice::valid_at_ms`.
48 /// `None` = no valid-time filter.
49 pub valid_at_ms: Option<i64>,
50}
51
52/// Gather response: shard returns matching rows as opaque msgpack row bytes.
53#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
54pub struct ArrayShardSliceResp {
55 pub shard_id: u32,
56 /// Zerompk-encoded rows. Each element is a zerompk encoding of one result row.
57 pub rows_msgpack: Vec<Vec<u8>>,
58 /// True when the shard hit the `limit` and may have more rows.
59 pub truncated: bool,
60 /// True when `system_as_of` is below the oldest tile version on this shard
61 /// and the shard produced zero rows as a result of that horizon.
62 pub truncated_before_horizon: bool,
63}
64
65/// Scatter request: coordinator asks a shard to compute a partial aggregate.
66///
67/// Mirrors `ArrayOp::Aggregate` — attribute index + reducer + optional filter.
68#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
69pub struct ArrayShardAggReq {
70 pub array_id_msgpack: Vec<u8>,
71 pub attr_idx: u32,
72 /// Zerompk encoding of `ArrayReducer` (c_enum, 1 byte on wire).
73 pub reducer_msgpack: Vec<u8>,
74 /// Negative means no group-by; non-negative is the dimension index.
75 pub group_by_dim: i32,
76 /// Optional surrogate pre-filter; empty = all cells.
77 pub cell_filter_msgpack: Vec<u8>,
78 /// Hilbert-prefix range `[lo, hi]` that this shard owns. The shard
79 /// applies this as a pre-filter so it only counts cells whose Hilbert
80 /// prefix falls within the range. `None` means unbounded (scan all).
81 pub shard_hilbert_range: Option<(u64, u64)>,
82 /// Bitemporal system-time cutoff forwarded from `ArrayOp::Aggregate::system_as_of`.
83 /// `None` = live read.
84 pub system_as_of: Option<i64>,
85 /// Bitemporal valid-time point forwarded from `ArrayOp::Aggregate::valid_at_ms`.
86 /// `None` = no valid-time filter.
87 pub valid_at_ms: Option<i64>,
88}
89
90/// Gather response: shard returns partial aggregate(s) for merge.
91#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
92pub struct ArrayShardAggResp {
93 pub shard_id: u32,
94 /// One partial per group-by bucket (or one entry when group_by_dim < 0).
95 pub partials: Vec<super::merge::ArrayAggPartial>,
96 /// True when `system_as_of` is below the oldest tile version on this shard
97 /// and the shard produced zero rows as a result of that horizon.
98 pub truncated_before_horizon: bool,
99}
100
101/// Scatter request: coordinator forwards a cell write to the owning shard.
102///
103/// `cells_msgpack` is a zerompk encoding of
104/// `Vec<nodedb::engine::array::wal::ArrayPutCell>`.
105/// All cells in this batch belong to the same Hilbert-prefix bucket.
106#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
107pub struct ArrayShardPutReq {
108 pub array_id_msgpack: Vec<u8>,
109 pub cells_msgpack: Vec<u8>,
110 pub wal_lsn: u64,
111 /// Hilbert prefix of any representative cell in this batch (used by
112 /// the shard handler to validate routing). Set to 0 when routing
113 /// metadata is unavailable (pre-routing clients).
114 pub representative_hilbert_prefix: u64,
115 /// Prefix bits used for routing (1–16). 0 means no validation.
116 pub prefix_bits: u8,
117}
118
119/// Acknowledgement: shard confirms the put was applied.
120#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
121pub struct ArrayShardPutResp {
122 pub shard_id: u32,
123 pub applied_lsn: u64,
124}
125
126/// Scatter request: coordinator asks a shard to delete cells by exact coords.
127///
128/// `coords_msgpack` is a zerompk encoding of `Vec<Vec<CoordValue>>`.
129#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
130pub struct ArrayShardDeleteReq {
131 pub array_id_msgpack: Vec<u8>,
132 pub coords_msgpack: Vec<u8>,
133 pub wal_lsn: u64,
134 /// Hilbert prefix of any representative coord in this batch. Used by the
135 /// shard handler to validate that it still owns the target Hilbert bucket.
136 /// Set to 0 when routing metadata is unavailable.
137 pub representative_hilbert_prefix: u64,
138 /// Prefix bits used for Hilbert routing (1–16). 0 means no validation.
139 pub prefix_bits: u8,
140}
141
142/// Acknowledgement: shard confirms the delete was applied.
143#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
144pub struct ArrayShardDeleteResp {
145 pub shard_id: u32,
146 pub applied_lsn: u64,
147}
148
149/// Scatter request: coordinator asks a shard to run a surrogate bitmap scan.
150///
151/// Used by the cross-engine fusion path to collect surrogates for cells
152/// matching a coord-range slice.
153#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
154pub struct ArrayShardSurrogateBitmapReq {
155 pub array_id_msgpack: Vec<u8>,
156 /// Zerompk encoding of `nodedb_array::query::Slice`.
157 pub slice_msgpack: Vec<u8>,
158}
159
160/// Response: shard returns a surrogate bitmap for matching cells.
161#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
162pub struct ArrayShardSurrogateBitmapResp {
163 pub shard_id: u32,
164 /// Zerompk encoding of `SurrogateBitmap` for the matching cells.
165 pub bitmap_msgpack: Vec<u8>,
166}