Skip to main content

nodedb_cluster/distributed_array/coordinator/
write.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Write-path coordinator for distributed array operations.
4//!
5//! Provides [`coord_put`], [`coord_put_partitioned`], and [`coord_delete`]
6//! — functions that partition a flat cell/coord list by Hilbert tile and
7//! fan writes to the owning vShards.
8
9use std::sync::Arc;
10
11use crate::circuit_breaker::CircuitBreaker;
12use crate::error::{ClusterError, Result};
13
14use super::super::partition::{partition_delete_coords, partition_put_cells};
15use super::super::rpc::ShardRpcDispatch;
16use super::super::scatter::{FanOutPartitionedParams, fan_out_partitioned};
17use super::super::wire::{
18    ArrayShardDeleteReq, ArrayShardDeleteResp, ArrayShardPutReq, ArrayShardPutResp,
19};
20use super::read::decode_resps;
21
22/// Parameters for write-path coordinator entry points (partitioned fan-out).
23pub struct ArrayWriteCoordParams {
24    pub source_node: u64,
25    pub timeout_ms: u64,
26}
27
28/// Forward pre-partitioned cell writes to the owning shards.
29///
30/// The caller groups cells by Hilbert prefix bucket using
31/// `array_vshard_for_tile` and produces one `ArrayShardPutReq` per
32/// target shard. This function dispatches each batch to its shard via
33/// `fan_out_partitioned` and collects acknowledgements.
34///
35/// No cell payload is decoded inside this function — the coordinator
36/// has no dependency on `nodedb-array`.
37pub async fn coord_put_partitioned(
38    params: &ArrayWriteCoordParams,
39    per_shard: Vec<(u32, ArrayShardPutReq)>,
40    dispatch: &Arc<dyn ShardRpcDispatch>,
41    circuit_breaker: &Arc<CircuitBreaker>,
42) -> Result<Vec<ArrayShardPutResp>> {
43    if per_shard.is_empty() {
44        return Ok(Vec::new());
45    }
46
47    let fo_params = FanOutPartitionedParams {
48        timeout_ms: params.timeout_ms,
49        source_node: params.source_node,
50    };
51
52    let encoded: Result<Vec<(u32, Vec<u8>)>> = per_shard
53        .iter()
54        .map(|(shard_id, req)| {
55            zerompk::to_msgpack_vec(req)
56                .map(|bytes| (*shard_id, bytes))
57                .map_err(|e| ClusterError::Codec {
58                    detail: format!("ArrayShardPutReq serialise (shard {shard_id}): {e}"),
59                })
60        })
61        .collect();
62
63    let raw = fan_out_partitioned(
64        &fo_params,
65        super::super::opcodes::ARRAY_SHARD_PUT_REQ,
66        &encoded?,
67        dispatch,
68        circuit_breaker,
69    )
70    .await?;
71
72    decode_resps::<ArrayShardPutResp>(&raw)
73}
74
75/// Partition a flat cell list by Hilbert tile and fan out to owning shards.
76///
77/// `cells` — each element is `(hilbert_prefix, zerompk-encoded single-cell bytes)`.
78/// The Hilbert prefix is computed by the caller (the Control Plane planner) from
79/// the cell's coord tuple and the array schema; this function does not decode
80/// cell bytes.
81///
82/// `prefix_bits` — routing granularity (1–16) from the array catalog entry.
83/// `wal_lsn` — WAL sequence number allocated by the Control Plane for this batch.
84///
85/// Atomicity is per-shard only: if cells span multiple shards each shard's write
86/// is committed independently. A partial failure returns the first error encountered;
87/// cells that were already committed to other shards are not rolled back.
88pub async fn coord_put(
89    params: &ArrayWriteCoordParams,
90    array_id_msgpack: Vec<u8>,
91    prefix_bits: u8,
92    wal_lsn: u64,
93    cells: &[(u64, Vec<u8>)],
94    dispatch: &Arc<dyn ShardRpcDispatch>,
95    circuit_breaker: &Arc<CircuitBreaker>,
96) -> Result<Vec<ArrayShardPutResp>> {
97    if cells.is_empty() {
98        return Ok(Vec::new());
99    }
100
101    let buckets = partition_put_cells(cells, prefix_bits)?;
102
103    let per_shard: Vec<(u32, ArrayShardPutReq)> = buckets
104        .into_iter()
105        .map(|b| {
106            let req = ArrayShardPutReq {
107                array_id_msgpack: array_id_msgpack.clone(),
108                cells_msgpack: b.cells_msgpack,
109                wal_lsn,
110                representative_hilbert_prefix: b.representative_hilbert_prefix,
111                prefix_bits,
112            };
113            (b.vshard_id, req)
114        })
115        .collect();
116
117    coord_put_partitioned(params, per_shard, dispatch, circuit_breaker).await
118}
119
120/// Partition a flat coord list by Hilbert tile and fan delete requests to owning shards.
121///
122/// `coords` — each element is `(hilbert_prefix, zerompk-encoded single-coord bytes)`.
123/// `prefix_bits` — routing granularity (1–16).
124/// `wal_lsn` — WAL sequence number allocated by the Control Plane.
125///
126/// Atomicity is per-shard only (same contract as `coord_put`).
127pub async fn coord_delete(
128    params: &ArrayWriteCoordParams,
129    array_id_msgpack: Vec<u8>,
130    prefix_bits: u8,
131    wal_lsn: u64,
132    coords: &[(u64, Vec<u8>)],
133    dispatch: &Arc<dyn ShardRpcDispatch>,
134    circuit_breaker: &Arc<CircuitBreaker>,
135) -> Result<Vec<ArrayShardDeleteResp>> {
136    if coords.is_empty() {
137        return Ok(Vec::new());
138    }
139
140    let buckets = partition_delete_coords(coords, prefix_bits)?;
141
142    let fo_params = FanOutPartitionedParams {
143        timeout_ms: params.timeout_ms,
144        source_node: params.source_node,
145    };
146
147    let encoded: Result<Vec<(u32, Vec<u8>)>> = buckets
148        .into_iter()
149        .map(|b| {
150            let req = ArrayShardDeleteReq {
151                array_id_msgpack: array_id_msgpack.clone(),
152                coords_msgpack: b.coords_msgpack,
153                wal_lsn,
154                representative_hilbert_prefix: b.representative_hilbert_prefix,
155                prefix_bits,
156            };
157            zerompk::to_msgpack_vec(&req)
158                .map(|bytes| (b.vshard_id, bytes))
159                .map_err(|e| ClusterError::Codec {
160                    detail: format!("ArrayShardDeleteReq serialise (shard {}): {e}", b.vshard_id),
161                })
162        })
163        .collect();
164
165    let raw = fan_out_partitioned(
166        &fo_params,
167        super::super::opcodes::ARRAY_SHARD_DELETE_REQ,
168        &encoded?,
169        dispatch,
170        circuit_breaker,
171    )
172    .await?;
173
174    decode_resps::<ArrayShardDeleteResp>(&raw)
175}