1use std::sync::Arc;
17
18use crate::error::{ClusterError, Result};
19
20use super::local_executor::ArrayLocalExecutor;
21use super::opcodes::{
22 ARRAY_SHARD_AGG_REQ, ARRAY_SHARD_DELETE_REQ, ARRAY_SHARD_PUT_REQ, ARRAY_SHARD_SLICE_REQ,
23 ARRAY_SHARD_SURROGATE_BITMAP_REQ,
24};
25use super::routing::{array_vshard_for_tile, array_vshards_for_slice};
26use super::wire::{
27 ArrayShardAggReq, ArrayShardAggResp, ArrayShardDeleteReq, ArrayShardDeleteResp,
28 ArrayShardPutReq, ArrayShardPutResp, ArrayShardSliceReq, ArrayShardSliceResp,
29 ArrayShardSurrogateBitmapReq, ArrayShardSurrogateBitmapResp,
30};
31
32pub async fn handle_array_shard_rpc(
42 opcode: u32,
43 local_vshard_id: u32,
44 payload: &[u8],
45 executor: &Arc<dyn ArrayLocalExecutor>,
46) -> Result<Vec<u8>> {
47 match opcode {
48 ARRAY_SHARD_SLICE_REQ => handle_slice(local_vshard_id, payload, executor).await,
49 ARRAY_SHARD_AGG_REQ => handle_agg(local_vshard_id, payload, executor).await,
50 ARRAY_SHARD_PUT_REQ => handle_put(local_vshard_id, payload, executor).await,
51 ARRAY_SHARD_DELETE_REQ => handle_delete(local_vshard_id, payload, executor).await,
52 ARRAY_SHARD_SURROGATE_BITMAP_REQ => {
53 handle_surrogate_bitmap(local_vshard_id, payload, executor).await
54 }
55 other => Err(ClusterError::Codec {
56 detail: format!("handle_array_shard_rpc: unknown opcode {other}"),
57 }),
58 }
59}
60
61async fn handle_slice(
62 local_vshard_id: u32,
63 payload: &[u8],
64 executor: &Arc<dyn ArrayLocalExecutor>,
65) -> Result<Vec<u8>> {
66 let req: ArrayShardSliceReq =
67 zerompk::from_msgpack(payload).map_err(|e| ClusterError::Codec {
68 detail: format!("ArrayShardSliceReq decode: {e}"),
69 })?;
70
71 validate_slice_routing(&req, local_vshard_id)?;
72
73 let rows = executor.exec_slice(&req).await?;
74
75 let truncated = req.limit > 0 && rows.len() >= req.limit as usize;
76 let resp = ArrayShardSliceResp {
77 shard_id: local_vshard_id,
78 rows_msgpack: rows,
79 truncated,
80 truncated_before_horizon: false,
81 };
82 serialise(resp)
83}
84
85async fn handle_agg(
86 local_vshard_id: u32,
87 payload: &[u8],
88 executor: &Arc<dyn ArrayLocalExecutor>,
89) -> Result<Vec<u8>> {
90 let req: ArrayShardAggReq =
91 zerompk::from_msgpack(payload).map_err(|e| ClusterError::Codec {
92 detail: format!("ArrayShardAggReq decode: {e}"),
93 })?;
94
95 let partials = executor.exec_agg(&req).await?;
96
97 let resp = ArrayShardAggResp {
98 shard_id: local_vshard_id,
99 partials,
100 truncated_before_horizon: false,
101 };
102 serialise(resp)
103}
104
105async fn handle_put(
106 local_vshard_id: u32,
107 payload: &[u8],
108 executor: &Arc<dyn ArrayLocalExecutor>,
109) -> Result<Vec<u8>> {
110 let req: ArrayShardPutReq =
111 zerompk::from_msgpack(payload).map_err(|e| ClusterError::Codec {
112 detail: format!("ArrayShardPutReq decode: {e}"),
113 })?;
114
115 validate_put_routing(&req, local_vshard_id)?;
121
122 let applied_lsn = executor.exec_put(&req).await?;
123 let resp = ArrayShardPutResp {
124 shard_id: local_vshard_id,
125 applied_lsn,
126 };
127 serialise(resp)
128}
129
130async fn handle_delete(
131 local_vshard_id: u32,
132 payload: &[u8],
133 executor: &Arc<dyn ArrayLocalExecutor>,
134) -> Result<Vec<u8>> {
135 let req: ArrayShardDeleteReq =
136 zerompk::from_msgpack(payload).map_err(|e| ClusterError::Codec {
137 detail: format!("ArrayShardDeleteReq decode: {e}"),
138 })?;
139
140 validate_delete_routing(&req, local_vshard_id)?;
141
142 let applied_lsn = executor
143 .exec_delete(&req.array_id_msgpack, &req.coords_msgpack, req.wal_lsn)
144 .await?;
145 let resp = ArrayShardDeleteResp {
146 shard_id: local_vshard_id,
147 applied_lsn,
148 };
149 serialise(resp)
150}
151
152async fn handle_surrogate_bitmap(
153 local_vshard_id: u32,
154 payload: &[u8],
155 executor: &Arc<dyn ArrayLocalExecutor>,
156) -> Result<Vec<u8>> {
157 let req: ArrayShardSurrogateBitmapReq =
158 zerompk::from_msgpack(payload).map_err(|e| ClusterError::Codec {
159 detail: format!("ArrayShardSurrogateBitmapReq decode: {e}"),
160 })?;
161
162 let bitmap_msgpack = executor
163 .exec_surrogate_bitmap_scan(&req.array_id_msgpack, &req.slice_msgpack)
164 .await?;
165
166 let resp = ArrayShardSurrogateBitmapResp {
167 shard_id: local_vshard_id,
168 bitmap_msgpack,
169 };
170 serialise(resp)
171}
172
173fn validate_put_routing(req: &ArrayShardPutReq, local_vshard_id: u32) -> Result<()> {
180 if req.prefix_bits == 0 {
181 return Ok(());
182 }
183 let expected = array_vshard_for_tile(req.representative_hilbert_prefix, req.prefix_bits)?;
184 if expected != local_vshard_id {
185 return Err(ClusterError::WrongOwner {
186 vshard_id: local_vshard_id,
187 expected_owner_node: None,
188 });
189 }
190 Ok(())
191}
192
193fn validate_slice_routing(req: &ArrayShardSliceReq, local_vshard_id: u32) -> Result<()> {
200 if req.prefix_bits == 0 || req.slice_hilbert_ranges.is_empty() {
201 return Ok(());
202 }
203 let covered = array_vshards_for_slice(
206 &req.slice_hilbert_ranges,
207 req.prefix_bits,
208 crate::routing::VSHARD_COUNT,
209 )?;
210 if covered.binary_search(&local_vshard_id).is_err() {
211 return Err(ClusterError::WrongOwner {
212 vshard_id: local_vshard_id,
213 expected_owner_node: None,
214 });
215 }
216 Ok(())
217}
218
219fn validate_delete_routing(req: &ArrayShardDeleteReq, local_vshard_id: u32) -> Result<()> {
224 if req.prefix_bits == 0 {
225 return Ok(());
226 }
227 let expected = array_vshard_for_tile(req.representative_hilbert_prefix, req.prefix_bits)?;
228 if expected != local_vshard_id {
229 return Err(ClusterError::WrongOwner {
230 vshard_id: local_vshard_id,
231 expected_owner_node: None,
232 });
233 }
234 Ok(())
235}
236
237fn serialise<T: zerompk::ToMessagePack>(val: T) -> Result<Vec<u8>> {
238 zerompk::to_msgpack_vec(&val).map_err(|e| ClusterError::Codec {
239 detail: format!("array response serialise: {e}"),
240 })
241}
242
243#[cfg(test)]
244mod tests {
245 use std::sync::Arc;
246
247 use async_trait::async_trait;
248
249 use crate::distributed_array::merge::ArrayAggPartial;
250 use crate::distributed_array::wire::{ArrayShardAggReq, ArrayShardPutReq};
251 use crate::error::Result;
252
253 use super::super::local_executor::ArrayLocalExecutor;
254 use super::super::opcodes::{
255 ARRAY_SHARD_AGG_REQ, ARRAY_SHARD_DELETE_REQ, ARRAY_SHARD_PUT_REQ, ARRAY_SHARD_SLICE_REQ,
256 ARRAY_SHARD_SURROGATE_BITMAP_REQ,
257 };
258 use super::super::wire::{
259 ArrayShardAggResp, ArrayShardDeleteResp, ArrayShardPutResp, ArrayShardSliceResp,
260 ArrayShardSurrogateBitmapResp,
261 };
262 use super::handle_array_shard_rpc;
263
264 struct StubExecutor {
268 rows: Vec<Vec<u8>>,
269 bitmap: Vec<u8>,
270 partials: Vec<ArrayAggPartial>,
271 }
272
273 #[async_trait]
274 impl ArrayLocalExecutor for StubExecutor {
275 async fn exec_slice(
276 &self,
277 _req: &super::super::wire::ArrayShardSliceReq,
278 ) -> Result<Vec<Vec<u8>>> {
279 Ok(self.rows.clone())
280 }
281
282 async fn exec_surrogate_bitmap_scan(
283 &self,
284 _array_id_msgpack: &[u8],
285 _slice_msgpack: &[u8],
286 ) -> Result<Vec<u8>> {
287 Ok(self.bitmap.clone())
288 }
289
290 async fn exec_agg(&self, _req: &ArrayShardAggReq) -> Result<Vec<ArrayAggPartial>> {
291 Ok(self.partials.clone())
292 }
293
294 async fn exec_put(&self, req: &ArrayShardPutReq) -> Result<u64> {
295 Ok(req.wal_lsn)
296 }
297
298 async fn exec_delete(
299 &self,
300 _array_id_msgpack: &[u8],
301 _coords_msgpack: &[u8],
302 wal_lsn: u64,
303 ) -> Result<u64> {
304 Ok(wal_lsn)
305 }
306 }
307
308 fn make_slice_req_bytes() -> Vec<u8> {
309 let req = super::super::wire::ArrayShardSliceReq {
310 array_id_msgpack: vec![],
311 slice_msgpack: vec![],
312 attr_projection: vec![],
313 limit: 10,
314 cell_filter_msgpack: vec![],
315 prefix_bits: 0,
317 slice_hilbert_ranges: vec![],
318 shard_hilbert_range: None,
319 system_as_of: None,
320 valid_at_ms: None,
321 };
322 zerompk::to_msgpack_vec(&req).unwrap()
323 }
324
325 fn make_agg_req_bytes() -> Vec<u8> {
326 let reducer_bytes = vec![0x00u8]; let req = super::super::wire::ArrayShardAggReq {
332 array_id_msgpack: vec![],
333 attr_idx: 0,
334 reducer_msgpack: reducer_bytes,
335 group_by_dim: -1,
336 cell_filter_msgpack: vec![],
337 shard_hilbert_range: None,
338 system_as_of: None,
339 valid_at_ms: None,
340 };
341 zerompk::to_msgpack_vec(&req).unwrap()
342 }
343
344 fn make_bitmap_req_bytes() -> Vec<u8> {
345 let req = super::super::wire::ArrayShardSurrogateBitmapReq {
346 array_id_msgpack: vec![],
347 slice_msgpack: vec![],
348 };
349 zerompk::to_msgpack_vec(&req).unwrap()
350 }
351
352 #[tokio::test]
353 async fn handle_slice_returns_executor_rows() {
354 let row = b"row-data".to_vec();
355 let executor: Arc<dyn ArrayLocalExecutor> = Arc::new(StubExecutor {
356 rows: vec![row.clone(), row.clone()],
357 bitmap: vec![],
358 partials: vec![],
359 });
360 let payload = make_slice_req_bytes();
361 let resp_bytes = handle_array_shard_rpc(ARRAY_SHARD_SLICE_REQ, 0, &payload, &executor)
362 .await
363 .expect("slice handler should succeed");
364
365 let resp: ArrayShardSliceResp =
366 zerompk::from_msgpack(&resp_bytes).expect("response should deserialise");
367 assert_eq!(resp.rows_msgpack.len(), 2);
368 assert_eq!(resp.rows_msgpack[0], row);
369 }
370
371 #[tokio::test]
372 async fn handle_surrogate_bitmap_returns_executor_bitmap() {
373 let bitmap = vec![0xDE, 0xAD, 0xBE, 0xEF];
374 let executor: Arc<dyn ArrayLocalExecutor> = Arc::new(StubExecutor {
375 rows: vec![],
376 bitmap: bitmap.clone(),
377 partials: vec![],
378 });
379 let payload = make_bitmap_req_bytes();
380 let resp_bytes =
381 handle_array_shard_rpc(ARRAY_SHARD_SURROGATE_BITMAP_REQ, 1, &payload, &executor)
382 .await
383 .expect("bitmap handler should succeed");
384
385 let resp: ArrayShardSurrogateBitmapResp =
386 zerompk::from_msgpack(&resp_bytes).expect("response should deserialise");
387 assert_eq!(resp.shard_id, 1);
388 assert_eq!(resp.bitmap_msgpack, bitmap);
389 }
390
391 #[tokio::test]
392 async fn handle_agg_returns_executor_partials() {
393 let partial = ArrayAggPartial::from_single(0, 42.0);
394 let executor: Arc<dyn ArrayLocalExecutor> = Arc::new(StubExecutor {
395 rows: vec![],
396 bitmap: vec![],
397 partials: vec![partial.clone()],
398 });
399 let payload = make_agg_req_bytes();
400 let resp_bytes = handle_array_shard_rpc(ARRAY_SHARD_AGG_REQ, 3, &payload, &executor)
401 .await
402 .expect("agg handler should succeed");
403
404 let resp: ArrayShardAggResp =
405 zerompk::from_msgpack(&resp_bytes).expect("response should deserialise");
406 assert_eq!(resp.shard_id, 3);
407 assert_eq!(resp.partials.len(), 1);
408 assert_eq!(resp.partials[0].count, 1);
409 assert!((resp.partials[0].sum - 42.0).abs() < f64::EPSILON);
410 }
411
412 fn make_put_req_bytes(vshard_id: u32, prefix_bits: u8) -> Vec<u8> {
413 let req = super::super::wire::ArrayShardPutReq {
418 array_id_msgpack: vec![],
419 cells_msgpack: vec![],
420 wal_lsn: 77,
421 representative_hilbert_prefix: if prefix_bits == 0 {
422 0
423 } else {
424 0u64
426 },
427 prefix_bits,
428 };
429 let _ = vshard_id; zerompk::to_msgpack_vec(&req).unwrap()
433 }
434
435 fn make_delete_req_bytes() -> Vec<u8> {
436 let req = super::super::wire::ArrayShardDeleteReq {
437 array_id_msgpack: vec![],
438 coords_msgpack: vec![],
439 wal_lsn: 88,
440 representative_hilbert_prefix: 0,
442 prefix_bits: 0,
443 };
444 zerompk::to_msgpack_vec(&req).unwrap()
445 }
446
447 #[tokio::test]
448 async fn handle_put_delegates_to_executor_and_echoes_lsn() {
449 let executor: Arc<dyn ArrayLocalExecutor> = Arc::new(StubExecutor {
450 rows: vec![],
451 bitmap: vec![],
452 partials: vec![],
453 });
454 let payload = make_put_req_bytes(0, 0);
456 let resp_bytes = handle_array_shard_rpc(ARRAY_SHARD_PUT_REQ, 0, &payload, &executor)
457 .await
458 .expect("put handler should succeed");
459
460 let resp: ArrayShardPutResp =
461 zerompk::from_msgpack(&resp_bytes).expect("response should deserialise");
462 assert_eq!(resp.shard_id, 0);
463 assert_eq!(resp.applied_lsn, 77);
464 }
465
466 #[tokio::test]
467 async fn handle_delete_delegates_to_executor_and_echoes_lsn() {
468 let executor: Arc<dyn ArrayLocalExecutor> = Arc::new(StubExecutor {
469 rows: vec![],
470 bitmap: vec![],
471 partials: vec![],
472 });
473 let payload = make_delete_req_bytes();
474 let resp_bytes = handle_array_shard_rpc(ARRAY_SHARD_DELETE_REQ, 2, &payload, &executor)
475 .await
476 .expect("delete handler should succeed");
477
478 let resp: ArrayShardDeleteResp =
479 zerompk::from_msgpack(&resp_bytes).expect("response should deserialise");
480 assert_eq!(resp.shard_id, 2);
481 assert_eq!(resp.applied_lsn, 88);
482 }
483
484 #[tokio::test]
485 async fn handle_put_rejects_misrouted_request() {
486 let executor: Arc<dyn ArrayLocalExecutor> = Arc::new(StubExecutor {
487 rows: vec![],
488 bitmap: vec![],
489 partials: vec![],
490 });
491 let req = super::super::wire::ArrayShardPutReq {
495 array_id_msgpack: vec![],
496 cells_msgpack: vec![],
497 wal_lsn: 1,
498 representative_hilbert_prefix: 0,
499 prefix_bits: 10,
500 };
501 let payload = zerompk::to_msgpack_vec(&req).unwrap();
502 let err = handle_array_shard_rpc(ARRAY_SHARD_PUT_REQ, 7, &payload, &executor)
503 .await
504 .expect_err("misrouted put should fail");
505 assert!(
506 matches!(
507 err,
508 crate::error::ClusterError::WrongOwner { vshard_id: 7, .. }
509 ),
510 "expected WrongOwner for vshard 7, got {err:?}"
511 );
512 }
513
514 #[test]
515 fn validate_slice_routing_rejects_disjoint_range() {
516 let req = super::super::wire::ArrayShardSliceReq {
521 array_id_msgpack: vec![],
522 slice_msgpack: vec![],
523 attr_projection: vec![],
524 limit: 10,
525 cell_filter_msgpack: vec![],
526 prefix_bits: 10,
527 slice_hilbert_ranges: vec![(0x0040_0000_0000_0000, 0x0040_0000_0000_0000)],
528 shard_hilbert_range: None,
529 system_as_of: None,
530 valid_at_ms: None,
531 };
532 let err = super::validate_slice_routing(&req, 5)
533 .expect_err("disjoint Hilbert range should reject");
534 assert!(
535 matches!(
536 err,
537 crate::error::ClusterError::WrongOwner { vshard_id: 5, .. }
538 ),
539 "expected WrongOwner for vshard 5, got {err:?}"
540 );
541 }
542
543 #[test]
544 fn validate_slice_routing_accepts_overlapping_range() {
545 let req = super::super::wire::ArrayShardSliceReq {
547 array_id_msgpack: vec![],
548 slice_msgpack: vec![],
549 attr_projection: vec![],
550 limit: 10,
551 cell_filter_msgpack: vec![],
552 prefix_bits: 10,
553 slice_hilbert_ranges: vec![(0x0040_0000_0000_0000, 0x0040_0000_0000_0000)],
554 shard_hilbert_range: None,
555 system_as_of: None,
556 valid_at_ms: None,
557 };
558 super::validate_slice_routing(&req, 1).expect("overlapping range should accept");
559 }
560
561 #[tokio::test]
562 async fn handle_unknown_opcode_returns_codec_error() {
563 let executor: Arc<dyn ArrayLocalExecutor> = Arc::new(StubExecutor {
564 rows: vec![],
565 bitmap: vec![],
566 partials: vec![],
567 });
568 let err = handle_array_shard_rpc(0xFF, 0, &[], &executor)
569 .await
570 .expect_err("unknown opcode should fail");
571 assert!(
572 matches!(err, crate::error::ClusterError::Codec { .. }),
573 "{err:?}"
574 );
575 }
576}