Skip to main content

nodedb_cluster/distributed_array/
handler.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Shard-side RPC handler for incoming array operations.
4//!
5//! `handle_array_shard_rpc` is called by the vshard dispatch table
6//! (see `crate::vshard_handler`) when an incoming `VShardEnvelope`
7//! carries an array opcode. It decodes the msgpack payload, delegates
8//! to the local array engine through `ArrayLocalExecutor`, and returns
9//! a serialised response envelope payload.
10//!
11//! The `ArrayLocalExecutor` trait is defined in `local_executor.rs` and
12//! implemented in the main `nodedb` binary, which has access to the SPSC
13//! bridge and the Data Plane array engine. This keeps `nodedb-cluster`
14//! free of a compile-time dependency on the engine crates.
15
16use std::sync::Arc;
17
18use crate::error::{ClusterError, Result};
19
20use super::local_executor::ArrayLocalExecutor;
21use super::opcodes::{
22    ARRAY_SHARD_AGG_REQ, ARRAY_SHARD_DELETE_REQ, ARRAY_SHARD_PUT_REQ, ARRAY_SHARD_SLICE_REQ,
23    ARRAY_SHARD_SURROGATE_BITMAP_REQ,
24};
25use super::routing::{array_vshard_for_tile, array_vshards_for_slice};
26use super::wire::{
27    ArrayShardAggReq, ArrayShardAggResp, ArrayShardDeleteReq, ArrayShardDeleteResp,
28    ArrayShardPutReq, ArrayShardPutResp, ArrayShardSliceReq, ArrayShardSliceResp,
29    ArrayShardSurrogateBitmapReq, ArrayShardSurrogateBitmapResp,
30};
31
32/// Dispatch an incoming array shard RPC to the appropriate local handler.
33///
34/// `opcode` is the `VShardMessageType` discriminant (u16).
35/// `local_vshard_id` is this shard's own vShard ID (for routing validation).
36/// `payload` is the zerompk-encoded request body.
37/// `executor` reaches into the local Data Plane array engine.
38///
39/// Returns the zerompk-encoded response body on success, ready to be placed
40/// into a response `VShardEnvelope`.
41pub async fn handle_array_shard_rpc(
42    opcode: u32,
43    local_vshard_id: u32,
44    payload: &[u8],
45    executor: &Arc<dyn ArrayLocalExecutor>,
46) -> Result<Vec<u8>> {
47    match opcode {
48        ARRAY_SHARD_SLICE_REQ => handle_slice(local_vshard_id, payload, executor).await,
49        ARRAY_SHARD_AGG_REQ => handle_agg(local_vshard_id, payload, executor).await,
50        ARRAY_SHARD_PUT_REQ => handle_put(local_vshard_id, payload, executor).await,
51        ARRAY_SHARD_DELETE_REQ => handle_delete(local_vshard_id, payload, executor).await,
52        ARRAY_SHARD_SURROGATE_BITMAP_REQ => {
53            handle_surrogate_bitmap(local_vshard_id, payload, executor).await
54        }
55        other => Err(ClusterError::Codec {
56            detail: format!("handle_array_shard_rpc: unknown opcode {other}"),
57        }),
58    }
59}
60
61async fn handle_slice(
62    local_vshard_id: u32,
63    payload: &[u8],
64    executor: &Arc<dyn ArrayLocalExecutor>,
65) -> Result<Vec<u8>> {
66    let req: ArrayShardSliceReq =
67        zerompk::from_msgpack(payload).map_err(|e| ClusterError::Codec {
68            detail: format!("ArrayShardSliceReq decode: {e}"),
69        })?;
70
71    validate_slice_routing(&req, local_vshard_id)?;
72
73    let rows = executor.exec_slice(&req).await?;
74
75    let truncated = req.limit > 0 && rows.len() >= req.limit as usize;
76    let resp = ArrayShardSliceResp {
77        shard_id: local_vshard_id,
78        rows_msgpack: rows,
79        truncated,
80        truncated_before_horizon: false,
81    };
82    serialise(resp)
83}
84
85async fn handle_agg(
86    local_vshard_id: u32,
87    payload: &[u8],
88    executor: &Arc<dyn ArrayLocalExecutor>,
89) -> Result<Vec<u8>> {
90    let req: ArrayShardAggReq =
91        zerompk::from_msgpack(payload).map_err(|e| ClusterError::Codec {
92            detail: format!("ArrayShardAggReq decode: {e}"),
93        })?;
94
95    let partials = executor.exec_agg(&req).await?;
96
97    let resp = ArrayShardAggResp {
98        shard_id: local_vshard_id,
99        partials,
100        truncated_before_horizon: false,
101    };
102    serialise(resp)
103}
104
105async fn handle_put(
106    local_vshard_id: u32,
107    payload: &[u8],
108    executor: &Arc<dyn ArrayLocalExecutor>,
109) -> Result<Vec<u8>> {
110    let req: ArrayShardPutReq =
111        zerompk::from_msgpack(payload).map_err(|e| ClusterError::Codec {
112            detail: format!("ArrayShardPutReq decode: {e}"),
113        })?;
114
115    // Validate routing before dispatching: the PUT must have been sent to
116    // the correct shard. A mismatch means the coordinator used a stale
117    // routing table and the write must be rejected so the caller can retry
118    // with a refreshed routing table rather than silently writing to the
119    // wrong shard.
120    validate_put_routing(&req, local_vshard_id)?;
121
122    let applied_lsn = executor.exec_put(&req).await?;
123    let resp = ArrayShardPutResp {
124        shard_id: local_vshard_id,
125        applied_lsn,
126    };
127    serialise(resp)
128}
129
130async fn handle_delete(
131    local_vshard_id: u32,
132    payload: &[u8],
133    executor: &Arc<dyn ArrayLocalExecutor>,
134) -> Result<Vec<u8>> {
135    let req: ArrayShardDeleteReq =
136        zerompk::from_msgpack(payload).map_err(|e| ClusterError::Codec {
137            detail: format!("ArrayShardDeleteReq decode: {e}"),
138        })?;
139
140    validate_delete_routing(&req, local_vshard_id)?;
141
142    let applied_lsn = executor
143        .exec_delete(&req.array_id_msgpack, &req.coords_msgpack, req.wal_lsn)
144        .await?;
145    let resp = ArrayShardDeleteResp {
146        shard_id: local_vshard_id,
147        applied_lsn,
148    };
149    serialise(resp)
150}
151
152async fn handle_surrogate_bitmap(
153    local_vshard_id: u32,
154    payload: &[u8],
155    executor: &Arc<dyn ArrayLocalExecutor>,
156) -> Result<Vec<u8>> {
157    let req: ArrayShardSurrogateBitmapReq =
158        zerompk::from_msgpack(payload).map_err(|e| ClusterError::Codec {
159            detail: format!("ArrayShardSurrogateBitmapReq decode: {e}"),
160        })?;
161
162    let bitmap_msgpack = executor
163        .exec_surrogate_bitmap_scan(&req.array_id_msgpack, &req.slice_msgpack)
164        .await?;
165
166    let resp = ArrayShardSurrogateBitmapResp {
167        shard_id: local_vshard_id,
168        bitmap_msgpack,
169    };
170    serialise(resp)
171}
172
173/// Validate that a PUT request is routed to the correct shard.
174///
175/// Returns `Ok(())` when routing is correct or when `prefix_bits == 0`
176/// (which means the request predates Hilbert routing and skips validation).
177/// Returns `Err(ClusterError::WrongOwner)` when the coordinator used a stale
178/// routing table; the coordinator should refresh and retry.
179fn validate_put_routing(req: &ArrayShardPutReq, local_vshard_id: u32) -> Result<()> {
180    if req.prefix_bits == 0 {
181        return Ok(());
182    }
183    let expected = array_vshard_for_tile(req.representative_hilbert_prefix, req.prefix_bits)?;
184    if expected != local_vshard_id {
185        return Err(ClusterError::WrongOwner {
186            vshard_id: local_vshard_id,
187            expected_owner_node: None,
188        });
189    }
190    Ok(())
191}
192
193/// Validate that a SLICE request is routed to the correct shard.
194///
195/// Returns `Ok(())` when `prefix_bits == 0` (no validation) or when
196/// `local_vshard_id` falls within the Hilbert ranges carried by the request.
197/// Returns `Err(ClusterError::WrongOwner)` when this shard no longer covers
198/// any of the requested ranges, indicating a stale routing table.
199fn validate_slice_routing(req: &ArrayShardSliceReq, local_vshard_id: u32) -> Result<()> {
200    if req.prefix_bits == 0 || req.slice_hilbert_ranges.is_empty() {
201        return Ok(());
202    }
203    // Compute which vShards overlap the slice ranges. If local_vshard_id is not
204    // among them, this node no longer owns the required Hilbert range.
205    let covered = array_vshards_for_slice(
206        &req.slice_hilbert_ranges,
207        req.prefix_bits,
208        crate::routing::VSHARD_COUNT,
209    )?;
210    if covered.binary_search(&local_vshard_id).is_err() {
211        return Err(ClusterError::WrongOwner {
212            vshard_id: local_vshard_id,
213            expected_owner_node: None,
214        });
215    }
216    Ok(())
217}
218
219/// Validate that a DELETE request is routed to the correct shard.
220///
221/// Returns `Ok(())` when routing is correct or when `prefix_bits == 0`.
222/// Returns `Err(ClusterError::WrongOwner)` on misroute.
223fn validate_delete_routing(req: &ArrayShardDeleteReq, local_vshard_id: u32) -> Result<()> {
224    if req.prefix_bits == 0 {
225        return Ok(());
226    }
227    let expected = array_vshard_for_tile(req.representative_hilbert_prefix, req.prefix_bits)?;
228    if expected != local_vshard_id {
229        return Err(ClusterError::WrongOwner {
230            vshard_id: local_vshard_id,
231            expected_owner_node: None,
232        });
233    }
234    Ok(())
235}
236
237fn serialise<T: zerompk::ToMessagePack>(val: T) -> Result<Vec<u8>> {
238    zerompk::to_msgpack_vec(&val).map_err(|e| ClusterError::Codec {
239        detail: format!("array response serialise: {e}"),
240    })
241}
242
243#[cfg(test)]
244mod tests {
245    use std::sync::Arc;
246
247    use async_trait::async_trait;
248
249    use crate::distributed_array::merge::ArrayAggPartial;
250    use crate::distributed_array::wire::{ArrayShardAggReq, ArrayShardPutReq};
251    use crate::error::Result;
252
253    use super::super::local_executor::ArrayLocalExecutor;
254    use super::super::opcodes::{
255        ARRAY_SHARD_AGG_REQ, ARRAY_SHARD_DELETE_REQ, ARRAY_SHARD_PUT_REQ, ARRAY_SHARD_SLICE_REQ,
256        ARRAY_SHARD_SURROGATE_BITMAP_REQ,
257    };
258    use super::super::wire::{
259        ArrayShardAggResp, ArrayShardDeleteResp, ArrayShardPutResp, ArrayShardSliceResp,
260        ArrayShardSurrogateBitmapResp,
261    };
262    use super::handle_array_shard_rpc;
263
264    /// Mock executor that returns a fixed set of row bytes for slice,
265    /// a fixed bitmap for surrogate scan, fixed partials for agg, and
266    /// echoes back the `wal_lsn` for put/delete.
267    struct StubExecutor {
268        rows: Vec<Vec<u8>>,
269        bitmap: Vec<u8>,
270        partials: Vec<ArrayAggPartial>,
271    }
272
273    #[async_trait]
274    impl ArrayLocalExecutor for StubExecutor {
275        async fn exec_slice(
276            &self,
277            _req: &super::super::wire::ArrayShardSliceReq,
278        ) -> Result<Vec<Vec<u8>>> {
279            Ok(self.rows.clone())
280        }
281
282        async fn exec_surrogate_bitmap_scan(
283            &self,
284            _array_id_msgpack: &[u8],
285            _slice_msgpack: &[u8],
286        ) -> Result<Vec<u8>> {
287            Ok(self.bitmap.clone())
288        }
289
290        async fn exec_agg(&self, _req: &ArrayShardAggReq) -> Result<Vec<ArrayAggPartial>> {
291            Ok(self.partials.clone())
292        }
293
294        async fn exec_put(&self, req: &ArrayShardPutReq) -> Result<u64> {
295            Ok(req.wal_lsn)
296        }
297
298        async fn exec_delete(
299            &self,
300            _array_id_msgpack: &[u8],
301            _coords_msgpack: &[u8],
302            wal_lsn: u64,
303        ) -> Result<u64> {
304            Ok(wal_lsn)
305        }
306    }
307
308    fn make_slice_req_bytes() -> Vec<u8> {
309        let req = super::super::wire::ArrayShardSliceReq {
310            array_id_msgpack: vec![],
311            slice_msgpack: vec![],
312            attr_projection: vec![],
313            limit: 10,
314            cell_filter_msgpack: vec![],
315            // prefix_bits=0 skips routing validation in the handler.
316            prefix_bits: 0,
317            slice_hilbert_ranges: vec![],
318            shard_hilbert_range: None,
319            system_as_of: None,
320            valid_at_ms: None,
321        };
322        zerompk::to_msgpack_vec(&req).unwrap()
323    }
324
325    fn make_agg_req_bytes() -> Vec<u8> {
326        // Encode a minimal Sum reducer (c_enum byte).
327        // We use rmp_serde directly since we don't have ArrayReducer here,
328        // but any single-byte msgpack value works as the reducer payload for
329        // StubExecutor (it never decodes it).
330        let reducer_bytes = vec![0x00u8]; // Sum = 0 as c_enum
331        let req = super::super::wire::ArrayShardAggReq {
332            array_id_msgpack: vec![],
333            attr_idx: 0,
334            reducer_msgpack: reducer_bytes,
335            group_by_dim: -1,
336            cell_filter_msgpack: vec![],
337            shard_hilbert_range: None,
338            system_as_of: None,
339            valid_at_ms: None,
340        };
341        zerompk::to_msgpack_vec(&req).unwrap()
342    }
343
344    fn make_bitmap_req_bytes() -> Vec<u8> {
345        let req = super::super::wire::ArrayShardSurrogateBitmapReq {
346            array_id_msgpack: vec![],
347            slice_msgpack: vec![],
348        };
349        zerompk::to_msgpack_vec(&req).unwrap()
350    }
351
352    #[tokio::test]
353    async fn handle_slice_returns_executor_rows() {
354        let row = b"row-data".to_vec();
355        let executor: Arc<dyn ArrayLocalExecutor> = Arc::new(StubExecutor {
356            rows: vec![row.clone(), row.clone()],
357            bitmap: vec![],
358            partials: vec![],
359        });
360        let payload = make_slice_req_bytes();
361        let resp_bytes = handle_array_shard_rpc(ARRAY_SHARD_SLICE_REQ, 0, &payload, &executor)
362            .await
363            .expect("slice handler should succeed");
364
365        let resp: ArrayShardSliceResp =
366            zerompk::from_msgpack(&resp_bytes).expect("response should deserialise");
367        assert_eq!(resp.rows_msgpack.len(), 2);
368        assert_eq!(resp.rows_msgpack[0], row);
369    }
370
371    #[tokio::test]
372    async fn handle_surrogate_bitmap_returns_executor_bitmap() {
373        let bitmap = vec![0xDE, 0xAD, 0xBE, 0xEF];
374        let executor: Arc<dyn ArrayLocalExecutor> = Arc::new(StubExecutor {
375            rows: vec![],
376            bitmap: bitmap.clone(),
377            partials: vec![],
378        });
379        let payload = make_bitmap_req_bytes();
380        let resp_bytes =
381            handle_array_shard_rpc(ARRAY_SHARD_SURROGATE_BITMAP_REQ, 1, &payload, &executor)
382                .await
383                .expect("bitmap handler should succeed");
384
385        let resp: ArrayShardSurrogateBitmapResp =
386            zerompk::from_msgpack(&resp_bytes).expect("response should deserialise");
387        assert_eq!(resp.shard_id, 1);
388        assert_eq!(resp.bitmap_msgpack, bitmap);
389    }
390
391    #[tokio::test]
392    async fn handle_agg_returns_executor_partials() {
393        let partial = ArrayAggPartial::from_single(0, 42.0);
394        let executor: Arc<dyn ArrayLocalExecutor> = Arc::new(StubExecutor {
395            rows: vec![],
396            bitmap: vec![],
397            partials: vec![partial.clone()],
398        });
399        let payload = make_agg_req_bytes();
400        let resp_bytes = handle_array_shard_rpc(ARRAY_SHARD_AGG_REQ, 3, &payload, &executor)
401            .await
402            .expect("agg handler should succeed");
403
404        let resp: ArrayShardAggResp =
405            zerompk::from_msgpack(&resp_bytes).expect("response should deserialise");
406        assert_eq!(resp.shard_id, 3);
407        assert_eq!(resp.partials.len(), 1);
408        assert_eq!(resp.partials[0].count, 1);
409        assert!((resp.partials[0].sum - 42.0).abs() < f64::EPSILON);
410    }
411
412    fn make_put_req_bytes(vshard_id: u32, prefix_bits: u8) -> Vec<u8> {
413        // Build a minimal ArrayShardPutReq with routing metadata set so
414        // validate_put_routing can confirm the request is destined for
415        // the right shard. Use prefix_bits=0 to skip routing validation
416        // in tests that only care about the executor round-trip.
417        let req = super::super::wire::ArrayShardPutReq {
418            array_id_msgpack: vec![],
419            cells_msgpack: vec![],
420            wal_lsn: 77,
421            representative_hilbert_prefix: if prefix_bits == 0 {
422                0
423            } else {
424                // Route to bucket 0 (top bits all zero) → vshard 0 with any prefix_bits.
425                0u64
426            },
427            prefix_bits,
428        };
429        // Adjust routing so the request would target vshard_id: set bits so
430        // bucket = vshard_id when stride = 1 (prefix_bits=10, VSHARD_COUNT=1024).
431        let _ = vshard_id; // routing byte chosen above; test uses prefix_bits=0.
432        zerompk::to_msgpack_vec(&req).unwrap()
433    }
434
435    fn make_delete_req_bytes() -> Vec<u8> {
436        let req = super::super::wire::ArrayShardDeleteReq {
437            array_id_msgpack: vec![],
438            coords_msgpack: vec![],
439            wal_lsn: 88,
440            // prefix_bits=0 skips routing validation in the handler.
441            representative_hilbert_prefix: 0,
442            prefix_bits: 0,
443        };
444        zerompk::to_msgpack_vec(&req).unwrap()
445    }
446
447    #[tokio::test]
448    async fn handle_put_delegates_to_executor_and_echoes_lsn() {
449        let executor: Arc<dyn ArrayLocalExecutor> = Arc::new(StubExecutor {
450            rows: vec![],
451            bitmap: vec![],
452            partials: vec![],
453        });
454        // prefix_bits=0 disables routing validation.
455        let payload = make_put_req_bytes(0, 0);
456        let resp_bytes = handle_array_shard_rpc(ARRAY_SHARD_PUT_REQ, 0, &payload, &executor)
457            .await
458            .expect("put handler should succeed");
459
460        let resp: ArrayShardPutResp =
461            zerompk::from_msgpack(&resp_bytes).expect("response should deserialise");
462        assert_eq!(resp.shard_id, 0);
463        assert_eq!(resp.applied_lsn, 77);
464    }
465
466    #[tokio::test]
467    async fn handle_delete_delegates_to_executor_and_echoes_lsn() {
468        let executor: Arc<dyn ArrayLocalExecutor> = Arc::new(StubExecutor {
469            rows: vec![],
470            bitmap: vec![],
471            partials: vec![],
472        });
473        let payload = make_delete_req_bytes();
474        let resp_bytes = handle_array_shard_rpc(ARRAY_SHARD_DELETE_REQ, 2, &payload, &executor)
475            .await
476            .expect("delete handler should succeed");
477
478        let resp: ArrayShardDeleteResp =
479            zerompk::from_msgpack(&resp_bytes).expect("response should deserialise");
480        assert_eq!(resp.shard_id, 2);
481        assert_eq!(resp.applied_lsn, 88);
482    }
483
484    #[tokio::test]
485    async fn handle_put_rejects_misrouted_request() {
486        let executor: Arc<dyn ArrayLocalExecutor> = Arc::new(StubExecutor {
487            rows: vec![],
488            bitmap: vec![],
489            partials: vec![],
490        });
491        // prefix_bits=10, stride=1 → bucket = top 10 bits of hilbert_prefix.
492        // hilbert_prefix = 0 → bucket 0 → expected vshard 0.
493        // We tell the handler we're vshard 7 → mismatch → error.
494        let req = super::super::wire::ArrayShardPutReq {
495            array_id_msgpack: vec![],
496            cells_msgpack: vec![],
497            wal_lsn: 1,
498            representative_hilbert_prefix: 0,
499            prefix_bits: 10,
500        };
501        let payload = zerompk::to_msgpack_vec(&req).unwrap();
502        let err = handle_array_shard_rpc(ARRAY_SHARD_PUT_REQ, 7, &payload, &executor)
503            .await
504            .expect_err("misrouted put should fail");
505        assert!(
506            matches!(
507                err,
508                crate::error::ClusterError::WrongOwner { vshard_id: 7, .. }
509            ),
510            "expected WrongOwner for vshard 7, got {err:?}"
511        );
512    }
513
514    #[test]
515    fn validate_slice_routing_rejects_disjoint_range() {
516        // prefix_bits=10, stride=1 → vshard == bucket == top 10 bits.
517        // Hilbert range [0x0040_0000_0000_0000, 0x0040_0000_0000_0000]
518        // covers bucket 1 → vshard 1.
519        // Local shard is vshard 5 → disjoint → WrongOwner.
520        let req = super::super::wire::ArrayShardSliceReq {
521            array_id_msgpack: vec![],
522            slice_msgpack: vec![],
523            attr_projection: vec![],
524            limit: 10,
525            cell_filter_msgpack: vec![],
526            prefix_bits: 10,
527            slice_hilbert_ranges: vec![(0x0040_0000_0000_0000, 0x0040_0000_0000_0000)],
528            shard_hilbert_range: None,
529            system_as_of: None,
530            valid_at_ms: None,
531        };
532        let err = super::validate_slice_routing(&req, 5)
533            .expect_err("disjoint Hilbert range should reject");
534        assert!(
535            matches!(
536                err,
537                crate::error::ClusterError::WrongOwner { vshard_id: 5, .. }
538            ),
539            "expected WrongOwner for vshard 5, got {err:?}"
540        );
541    }
542
543    #[test]
544    fn validate_slice_routing_accepts_overlapping_range() {
545        // Same range as above → vshard 1. Local shard IS vshard 1 → accept.
546        let req = super::super::wire::ArrayShardSliceReq {
547            array_id_msgpack: vec![],
548            slice_msgpack: vec![],
549            attr_projection: vec![],
550            limit: 10,
551            cell_filter_msgpack: vec![],
552            prefix_bits: 10,
553            slice_hilbert_ranges: vec![(0x0040_0000_0000_0000, 0x0040_0000_0000_0000)],
554            shard_hilbert_range: None,
555            system_as_of: None,
556            valid_at_ms: None,
557        };
558        super::validate_slice_routing(&req, 1).expect("overlapping range should accept");
559    }
560
561    #[tokio::test]
562    async fn handle_unknown_opcode_returns_codec_error() {
563        let executor: Arc<dyn ArrayLocalExecutor> = Arc::new(StubExecutor {
564            rows: vec![],
565            bitmap: vec![],
566            partials: vec![],
567        });
568        let err = handle_array_shard_rpc(0xFF, 0, &[], &executor)
569            .await
570            .expect_err("unknown opcode should fail");
571        assert!(
572            matches!(err, crate::error::ClusterError::Codec { .. }),
573            "{err:?}"
574        );
575    }
576}