1use super::VectorizerClient;
8use crate::error::{Result, VectorizerError};
9use crate::models::{
10 AddPeerRequest, FailoverReport, PeerInfo, RebalanceJob, ReplicaInfo, ReplicationConfig,
11 ReplicationStats, ReplicationStatus, ResyncJob,
12};
13
14impl VectorizerClient {
15 pub async fn get_replication_status(&self) -> Result<ReplicationStatus> {
19 let response = self
20 .make_request("GET", "/replication/status", None)
21 .await?;
22 serde_json::from_str(&response).map_err(|e| {
23 VectorizerError::server(format!(
24 "Failed to parse get_replication_status response: {e}"
25 ))
26 })
27 }
28
29 pub async fn configure_replication(&self, config: ReplicationConfig) -> Result<()> {
35 let payload = serde_json::to_value(&config).map_err(|e| {
36 VectorizerError::server(format!(
37 "Failed to serialize configure_replication request: {e}"
38 ))
39 })?;
40 self.make_request("POST", "/replication/configure", Some(payload))
41 .await?;
42 Ok(())
43 }
44
45 pub async fn get_replication_stats(&self) -> Result<ReplicationStats> {
50 let response = self.make_request("GET", "/replication/stats", None).await?;
51 serde_json::from_str(&response).map_err(|e| {
52 VectorizerError::server(format!(
53 "Failed to parse get_replication_stats response: {e}"
54 ))
55 })
56 }
57
58 pub async fn list_replicas(&self) -> Result<Vec<ReplicaInfo>> {
63 let response = self
64 .make_request("GET", "/replication/replicas", None)
65 .await?;
66 let val: serde_json::Value = serde_json::from_str(&response).map_err(|e| {
67 VectorizerError::server(format!("Failed to parse list_replicas response: {e}"))
68 })?;
69 let arr = val
70 .get("replicas")
71 .and_then(|r| r.as_array())
72 .cloned()
73 .unwrap_or_default();
74 arr.into_iter()
75 .map(|v| {
76 serde_json::from_value(v).map_err(|e| {
77 VectorizerError::server(format!("Failed to parse replica entry: {e}"))
78 })
79 })
80 .collect()
81 }
82
83 pub async fn cluster_failover(&self, replica_id: &str) -> Result<FailoverReport> {
89 let payload = serde_json::json!({ "replica_id": replica_id });
90 let response = self
91 .make_request("POST", "/cluster/failover", Some(payload))
92 .await?;
93 serde_json::from_str(&response).map_err(|e| {
94 VectorizerError::server(format!("Failed to parse cluster_failover response: {e}"))
95 })
96 }
97
98 pub async fn cluster_resync_replica(&self, replica_id: &str) -> Result<ResyncJob> {
102 let response = self
103 .make_request(
104 "POST",
105 &format!("/cluster/replicas/{replica_id}/resync"),
106 Some(serde_json::json!({})),
107 )
108 .await?;
109 serde_json::from_str(&response).map_err(|e| {
110 VectorizerError::server(format!(
111 "Failed to parse cluster_resync_replica response: {e}"
112 ))
113 })
114 }
115
116 pub async fn cluster_add_peer(&self, request: AddPeerRequest) -> Result<PeerInfo> {
120 let payload = serde_json::to_value(&request).map_err(|e| {
121 VectorizerError::server(format!("Failed to serialize cluster_add_peer request: {e}"))
122 })?;
123 let response = self
124 .make_request("POST", "/cluster/peers", Some(payload))
125 .await?;
126 serde_json::from_str(&response).map_err(|e| {
127 VectorizerError::server(format!("Failed to parse cluster_add_peer response: {e}"))
128 })
129 }
130
131 pub async fn cluster_rebalance(&self) -> Result<RebalanceJob> {
137 let response = self
138 .make_request("POST", "/cluster/rebalance", Some(serde_json::json!({})))
139 .await?;
140 serde_json::from_str(&response).map_err(|e| {
141 VectorizerError::server(format!("Failed to parse cluster_rebalance response: {e}"))
142 })
143 }
144
145 pub async fn cluster_rebalance_status(&self) -> Result<Option<RebalanceJob>> {
150 let response = self
151 .make_request("GET", "/cluster/rebalance/status", None)
152 .await?;
153 let val: serde_json::Value = serde_json::from_str(&response).map_err(|e| {
154 VectorizerError::server(format!(
155 "Failed to parse cluster_rebalance_status response: {e}"
156 ))
157 })?;
158 if val.get("status").and_then(|s| s.as_str()) == Some("idle") {
160 return Ok(None);
161 }
162 serde_json::from_value(val).map(Some).map_err(|e| {
163 VectorizerError::server(format!(
164 "Failed to deserialize cluster_rebalance_status: {e}"
165 ))
166 })
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 #![allow(clippy::unwrap_used)]
173
174 use serde_json::json;
175
176 use crate::models::{ReplicationConfig, ReplicationStats, ReplicationStatus};
177
178 #[test]
179 fn replication_status_deserializes_standalone() {
180 let raw = json!({
181 "role": "Standalone",
182 "enabled": false
183 });
184 let rs: ReplicationStatus = serde_json::from_value(raw).unwrap();
185 assert_eq!(rs.role, "Standalone");
186 assert!(!rs.enabled);
187 assert!(rs.stats.is_none());
188 assert!(rs.replicas.is_none());
189 }
190
191 #[test]
192 fn replication_status_deserializes_master() {
193 let raw = json!({
194 "role": "Master",
195 "enabled": true,
196 "stats": {
197 "master_offset": 100,
198 "replica_offset": 95,
199 "lag_operations": 5,
200 "total_replicated": 500
201 },
202 "replicas": []
203 });
204 let rs: ReplicationStatus = serde_json::from_value(raw).unwrap();
205 assert_eq!(rs.role, "Master");
206 assert!(rs.enabled);
207 assert!(rs.stats.is_some());
208 }
209
210 #[test]
211 fn replication_config_serializes_master() {
212 let cfg = ReplicationConfig {
213 role: "master".into(),
214 bind_address: Some("0.0.0.0:15010".into()),
215 master_address: None,
216 heartbeat_interval: Some(1000),
217 log_size: None,
218 };
219 let v = serde_json::to_value(&cfg).unwrap();
220 assert_eq!(v["role"], "master");
221 assert_eq!(v["bind_address"], "0.0.0.0:15010");
222 assert_eq!(v["heartbeat_interval"], 1000);
223 assert!(v.get("master_address").is_none());
224 }
225
226 #[test]
227 fn replication_config_serializes_replica() {
228 let cfg = ReplicationConfig {
229 role: "replica".into(),
230 bind_address: None,
231 master_address: Some("master.host:15010".into()),
232 heartbeat_interval: None,
233 log_size: None,
234 };
235 let v = serde_json::to_value(&cfg).unwrap();
236 assert_eq!(v["role"], "replica");
237 assert_eq!(v["master_address"], "master.host:15010");
238 }
239
240 #[test]
241 fn replication_stats_round_trip() {
242 let raw = json!({
243 "master_offset": 200,
244 "replica_offset": 190,
245 "lag_operations": 10,
246 "total_replicated": 1000,
247 "bytes_sent": 4096u64,
248 "connected_replicas": 2
249 });
250 let stats: ReplicationStats = serde_json::from_value(raw).unwrap();
251 assert_eq!(stats.master_offset, 200);
252 assert_eq!(stats.lag_operations, 10);
253 assert_eq!(stats.bytes_sent, Some(4096));
254 assert_eq!(stats.connected_replicas, Some(2));
255 }
256
257 use crate::models::{AddPeerRequest, FailoverReport, PeerInfo, RebalanceJob, ResyncJob};
260
261 #[test]
262 fn failover_report_round_trip() {
263 let raw = json!({
264 "promoted_replica_id": "replica-1",
265 "master_offset_at_promotion": 1000u64,
266 "replica_offset_at_promotion": 999u64,
267 "residual_lag_operations": 1u64
268 });
269 let r: FailoverReport = serde_json::from_value(raw).unwrap();
270 assert_eq!(r.promoted_replica_id, "replica-1");
271 assert_eq!(r.master_offset_at_promotion, 1000);
272 assert_eq!(r.residual_lag_operations, 1);
273 }
274
275 #[test]
276 fn resync_job_round_trip() {
277 let raw = json!({
278 "replica_id": "replica-2",
279 "snapshot_offset": 5000u64,
280 "full_snapshot": true
281 });
282 let j: ResyncJob = serde_json::from_value(raw).unwrap();
283 assert_eq!(j.replica_id, "replica-2");
284 assert_eq!(j.snapshot_offset, 5000);
285 assert!(j.full_snapshot);
286 }
287
288 #[test]
289 fn peer_info_round_trip() {
290 let raw = json!({
291 "node_id": "peer-abc",
292 "address": "10.0.0.2:15003",
293 "role": "member"
294 });
295 let p: PeerInfo = serde_json::from_value(raw).unwrap();
296 assert_eq!(p.node_id, "peer-abc");
297 assert_eq!(p.role, "member");
298 }
299
300 #[test]
301 fn add_peer_request_serializes() {
302 let req = AddPeerRequest {
303 address: "10.0.0.3:15003".into(),
304 role: "observer".into(),
305 };
306 let v = serde_json::to_value(&req).unwrap();
307 assert_eq!(v["address"], "10.0.0.3:15003");
308 assert_eq!(v["role"], "observer");
309 }
310
311 #[test]
312 fn rebalance_job_round_trip() {
313 let raw = json!({
314 "job_id": "job-xyz",
315 "status": "running",
316 "shards_to_move": 4usize,
317 "shards_moved": 1usize,
318 "message": "Rebalance started"
319 });
320 let j: RebalanceJob = serde_json::from_value(raw).unwrap();
321 assert_eq!(j.job_id, "job-xyz");
322 assert_eq!(j.status, "running");
323 assert_eq!(j.shards_to_move, 4);
324 assert!(j.last_checkpoint_node.is_none());
325 }
326}