nodedb_cluster/distributed_array/coordinator/
write.rs1use std::sync::Arc;
10
11use crate::circuit_breaker::CircuitBreaker;
12use crate::error::{ClusterError, Result};
13
14use super::super::partition::{partition_delete_coords, partition_put_cells};
15use super::super::rpc::ShardRpcDispatch;
16use super::super::scatter::{FanOutPartitionedParams, fan_out_partitioned};
17use super::super::wire::{
18 ArrayShardDeleteReq, ArrayShardDeleteResp, ArrayShardPutReq, ArrayShardPutResp,
19};
20use super::read::decode_resps;
21
22pub struct ArrayWriteCoordParams {
24 pub source_node: u64,
25 pub timeout_ms: u64,
26}
27
28pub async fn coord_put_partitioned(
38 params: &ArrayWriteCoordParams,
39 per_shard: Vec<(u32, ArrayShardPutReq)>,
40 dispatch: &Arc<dyn ShardRpcDispatch>,
41 circuit_breaker: &Arc<CircuitBreaker>,
42) -> Result<Vec<ArrayShardPutResp>> {
43 if per_shard.is_empty() {
44 return Ok(Vec::new());
45 }
46
47 let fo_params = FanOutPartitionedParams {
48 timeout_ms: params.timeout_ms,
49 source_node: params.source_node,
50 };
51
52 let encoded: Result<Vec<(u32, Vec<u8>)>> = per_shard
53 .iter()
54 .map(|(shard_id, req)| {
55 zerompk::to_msgpack_vec(req)
56 .map(|bytes| (*shard_id, bytes))
57 .map_err(|e| ClusterError::Codec {
58 detail: format!("ArrayShardPutReq serialise (shard {shard_id}): {e}"),
59 })
60 })
61 .collect();
62
63 let raw = fan_out_partitioned(
64 &fo_params,
65 super::super::opcodes::ARRAY_SHARD_PUT_REQ,
66 &encoded?,
67 dispatch,
68 circuit_breaker,
69 )
70 .await?;
71
72 decode_resps::<ArrayShardPutResp>(&raw)
73}
74
75pub async fn coord_put(
89 params: &ArrayWriteCoordParams,
90 array_id_msgpack: Vec<u8>,
91 prefix_bits: u8,
92 wal_lsn: u64,
93 cells: &[(u64, Vec<u8>)],
94 dispatch: &Arc<dyn ShardRpcDispatch>,
95 circuit_breaker: &Arc<CircuitBreaker>,
96) -> Result<Vec<ArrayShardPutResp>> {
97 if cells.is_empty() {
98 return Ok(Vec::new());
99 }
100
101 let buckets = partition_put_cells(cells, prefix_bits)?;
102
103 let per_shard: Vec<(u32, ArrayShardPutReq)> = buckets
104 .into_iter()
105 .map(|b| {
106 let req = ArrayShardPutReq {
107 array_id_msgpack: array_id_msgpack.clone(),
108 cells_msgpack: b.cells_msgpack,
109 wal_lsn,
110 representative_hilbert_prefix: b.representative_hilbert_prefix,
111 prefix_bits,
112 };
113 (b.vshard_id, req)
114 })
115 .collect();
116
117 coord_put_partitioned(params, per_shard, dispatch, circuit_breaker).await
118}
119
120pub async fn coord_delete(
128 params: &ArrayWriteCoordParams,
129 array_id_msgpack: Vec<u8>,
130 prefix_bits: u8,
131 wal_lsn: u64,
132 coords: &[(u64, Vec<u8>)],
133 dispatch: &Arc<dyn ShardRpcDispatch>,
134 circuit_breaker: &Arc<CircuitBreaker>,
135) -> Result<Vec<ArrayShardDeleteResp>> {
136 if coords.is_empty() {
137 return Ok(Vec::new());
138 }
139
140 let buckets = partition_delete_coords(coords, prefix_bits)?;
141
142 let fo_params = FanOutPartitionedParams {
143 timeout_ms: params.timeout_ms,
144 source_node: params.source_node,
145 };
146
147 let encoded: Result<Vec<(u32, Vec<u8>)>> = buckets
148 .into_iter()
149 .map(|b| {
150 let req = ArrayShardDeleteReq {
151 array_id_msgpack: array_id_msgpack.clone(),
152 coords_msgpack: b.coords_msgpack,
153 wal_lsn,
154 representative_hilbert_prefix: b.representative_hilbert_prefix,
155 prefix_bits,
156 };
157 zerompk::to_msgpack_vec(&req)
158 .map(|bytes| (b.vshard_id, bytes))
159 .map_err(|e| ClusterError::Codec {
160 detail: format!("ArrayShardDeleteReq serialise (shard {}): {e}", b.vshard_id),
161 })
162 })
163 .collect();
164
165 let raw = fan_out_partitioned(
166 &fo_params,
167 super::super::opcodes::ARRAY_SHARD_DELETE_REQ,
168 &encoded?,
169 dispatch,
170 circuit_breaker,
171 )
172 .await?;
173
174 decode_resps::<ArrayShardDeleteResp>(&raw)
175}