Skip to main content

nodedb_cluster/distributed_array/
wire.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Wire types for array shard RPC messages.
4//!
5//! All request/response pairs are zerompk-serialisable. Complex sub-payloads
6//! (slice predicates, cell batches) are carried as opaque msgpack bytes —
7//! the same convention used by `ArrayOp` in the bridge physical plan — so
8//! this crate does not need a compile-time dependency on `nodedb-array`.
9
10/// Scatter request: coordinator asks a shard to execute a coord-range slice.
11///
12/// `slice_msgpack` is a zerompk encoding of `nodedb_array::query::Slice`.
13/// `attr_projection` is the attribute index list; empty means all attributes.
14#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
15pub struct ArrayShardSliceReq {
16    /// Array scoped by tenant + name (zerompk encoding of `nodedb_array::types::ArrayId`).
17    pub array_id_msgpack: Vec<u8>,
18    /// Zerompk encoding of `nodedb_array::query::Slice`.
19    pub slice_msgpack: Vec<u8>,
20    pub attr_projection: Vec<u32>,
21    pub limit: u32,
22    /// Optional surrogate pre-filter bitmap bytes (zerompk encoding of `SurrogateBitmap`).
23    /// Empty slice means no filter.
24    pub cell_filter_msgpack: Vec<u8>,
25    /// Prefix bits used for Hilbert routing (1–16). 0 means no routing validation.
26    ///
27    /// When non-zero the shard verifies that its `local_vshard_id` is covered
28    /// by the Hilbert ranges in `slice_hilbert_ranges` before executing the
29    /// scan. A mismatch returns `ClusterError::WrongOwner` so the coordinator
30    /// can retry against a refreshed routing table.
31    pub prefix_bits: u8,
32    /// Inclusive Hilbert-prefix ranges `(lo, hi)` that this slice covers,
33    /// pre-computed by the coordinator from the slice predicate. Each entry
34    /// is encoded as two consecutive `u64` values (little-endian). Empty
35    /// means unbounded (no routing validation is performed).
36    pub slice_hilbert_ranges: Vec<(u64, u64)>,
37    /// Hilbert-prefix range `[lo, hi]` that this shard owns. The shard
38    /// applies this as a pre-filter so it only returns cells whose Hilbert
39    /// prefix falls within the range. `None` means unbounded (return all
40    /// matching cells). Used by the distributed shard handler to prevent
41    /// duplicate rows in single-node harnesses where all vShards share one
42    /// Data Plane.
43    pub shard_hilbert_range: Option<(u64, u64)>,
44    /// Bitemporal system-time cutoff forwarded from `ArrayOp::Slice::system_as_of`.
45    /// `None` = live read.
46    pub system_as_of: Option<i64>,
47    /// Bitemporal valid-time point forwarded from `ArrayOp::Slice::valid_at_ms`.
48    /// `None` = no valid-time filter.
49    pub valid_at_ms: Option<i64>,
50}
51
52/// Gather response: shard returns matching rows as opaque msgpack row bytes.
53#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
54pub struct ArrayShardSliceResp {
55    pub shard_id: u32,
56    /// Zerompk-encoded rows. Each element is a zerompk encoding of one result row.
57    pub rows_msgpack: Vec<Vec<u8>>,
58    /// True when the shard hit the `limit` and may have more rows.
59    pub truncated: bool,
60    /// True when `system_as_of` is below the oldest tile version on this shard
61    /// and the shard produced zero rows as a result of that horizon.
62    pub truncated_before_horizon: bool,
63}
64
65/// Scatter request: coordinator asks a shard to compute a partial aggregate.
66///
67/// Mirrors `ArrayOp::Aggregate` — attribute index + reducer + optional filter.
68#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
69pub struct ArrayShardAggReq {
70    pub array_id_msgpack: Vec<u8>,
71    pub attr_idx: u32,
72    /// Zerompk encoding of `ArrayReducer` (c_enum, 1 byte on wire).
73    pub reducer_msgpack: Vec<u8>,
74    /// Negative means no group-by; non-negative is the dimension index.
75    pub group_by_dim: i32,
76    /// Optional surrogate pre-filter; empty = all cells.
77    pub cell_filter_msgpack: Vec<u8>,
78    /// Hilbert-prefix range `[lo, hi]` that this shard owns. The shard
79    /// applies this as a pre-filter so it only counts cells whose Hilbert
80    /// prefix falls within the range. `None` means unbounded (scan all).
81    pub shard_hilbert_range: Option<(u64, u64)>,
82    /// Bitemporal system-time cutoff forwarded from `ArrayOp::Aggregate::system_as_of`.
83    /// `None` = live read.
84    pub system_as_of: Option<i64>,
85    /// Bitemporal valid-time point forwarded from `ArrayOp::Aggregate::valid_at_ms`.
86    /// `None` = no valid-time filter.
87    pub valid_at_ms: Option<i64>,
88}
89
90/// Gather response: shard returns partial aggregate(s) for merge.
91#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
92pub struct ArrayShardAggResp {
93    pub shard_id: u32,
94    /// One partial per group-by bucket (or one entry when group_by_dim < 0).
95    pub partials: Vec<super::merge::ArrayAggPartial>,
96    /// True when `system_as_of` is below the oldest tile version on this shard
97    /// and the shard produced zero rows as a result of that horizon.
98    pub truncated_before_horizon: bool,
99}
100
101/// Scatter request: coordinator forwards a cell write to the owning shard.
102///
103/// `cells_msgpack` is a zerompk encoding of
104/// `Vec<nodedb::engine::array::wal::ArrayPutCell>`.
105/// All cells in this batch belong to the same Hilbert-prefix bucket.
106#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
107pub struct ArrayShardPutReq {
108    pub array_id_msgpack: Vec<u8>,
109    pub cells_msgpack: Vec<u8>,
110    pub wal_lsn: u64,
111    /// Hilbert prefix of any representative cell in this batch (used by
112    /// the shard handler to validate routing). Set to 0 when routing
113    /// metadata is unavailable (pre-routing clients).
114    pub representative_hilbert_prefix: u64,
115    /// Prefix bits used for routing (1–16). 0 means no validation.
116    pub prefix_bits: u8,
117}
118
119/// Acknowledgement: shard confirms the put was applied.
120#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
121pub struct ArrayShardPutResp {
122    pub shard_id: u32,
123    pub applied_lsn: u64,
124}
125
126/// Scatter request: coordinator asks a shard to delete cells by exact coords.
127///
128/// `coords_msgpack` is a zerompk encoding of `Vec<Vec<CoordValue>>`.
129#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
130pub struct ArrayShardDeleteReq {
131    pub array_id_msgpack: Vec<u8>,
132    pub coords_msgpack: Vec<u8>,
133    pub wal_lsn: u64,
134    /// Hilbert prefix of any representative coord in this batch. Used by the
135    /// shard handler to validate that it still owns the target Hilbert bucket.
136    /// Set to 0 when routing metadata is unavailable.
137    pub representative_hilbert_prefix: u64,
138    /// Prefix bits used for Hilbert routing (1–16). 0 means no validation.
139    pub prefix_bits: u8,
140}
141
142/// Acknowledgement: shard confirms the delete was applied.
143#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
144pub struct ArrayShardDeleteResp {
145    pub shard_id: u32,
146    pub applied_lsn: u64,
147}
148
149/// Scatter request: coordinator asks a shard to run a surrogate bitmap scan.
150///
151/// Used by the cross-engine fusion path to collect surrogates for cells
152/// matching a coord-range slice.
153#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
154pub struct ArrayShardSurrogateBitmapReq {
155    pub array_id_msgpack: Vec<u8>,
156    /// Zerompk encoding of `nodedb_array::query::Slice`.
157    pub slice_msgpack: Vec<u8>,
158}
159
160/// Response: shard returns a surrogate bitmap for matching cells.
161#[derive(Debug, Clone, zerompk::ToMessagePack, zerompk::FromMessagePack)]
162pub struct ArrayShardSurrogateBitmapResp {
163    pub shard_id: u32,
164    /// Zerompk encoding of `SurrogateBitmap` for the matching cells.
165    pub bitmap_msgpack: Vec<u8>,
166}