Skip to main content

vectorizer_sdk/client/
replication.rs

1//! Replication surface.
2//!
3//! Covers the `/replication/*` REST endpoints (status, configuration,
4//! statistics, replica listing) and the `/cluster/*` admin endpoints
5//! added in phase15 (failover, resync, add-peer, rebalance).
6
7use super::VectorizerClient;
8use crate::error::{Result, VectorizerError};
9use crate::models::{
10    AddPeerRequest, FailoverReport, PeerInfo, RebalanceJob, ReplicaInfo, ReplicationConfig,
11    ReplicationStats, ReplicationStatus, ResyncJob,
12};
13
14impl VectorizerClient {
15    /// Get the current replication status and role of this node.
16    ///
17    /// Calls `GET /replication/status`.
18    pub async fn get_replication_status(&self) -> Result<ReplicationStatus> {
19        let response = self
20            .make_request("GET", "/replication/status", None)
21            .await?;
22        serde_json::from_str(&response).map_err(|e| {
23            VectorizerError::server(format!(
24                "Failed to parse get_replication_status response: {e}"
25            ))
26        })
27    }
28
29    /// Configure this node's replication role and parameters.
30    ///
31    /// Calls `POST /replication/configure` with the full
32    /// [`ReplicationConfig`]. A server restart is required for the
33    /// new config to take effect.
34    pub async fn configure_replication(&self, config: ReplicationConfig) -> Result<()> {
35        let payload = serde_json::to_value(&config).map_err(|e| {
36            VectorizerError::server(format!(
37                "Failed to serialize configure_replication request: {e}"
38            ))
39        })?;
40        self.make_request("POST", "/replication/configure", Some(payload))
41            .await?;
42        Ok(())
43    }
44
45    /// Get raw replication statistics for the active replication node.
46    ///
47    /// Calls `GET /replication/stats`. Returns an error when
48    /// replication is not enabled on this node.
49    pub async fn get_replication_stats(&self) -> Result<ReplicationStats> {
50        let response = self.make_request("GET", "/replication/stats", None).await?;
51        serde_json::from_str(&response).map_err(|e| {
52            VectorizerError::server(format!(
53                "Failed to parse get_replication_stats response: {e}"
54            ))
55        })
56    }
57
58    /// List the replica nodes connected to this master.
59    ///
60    /// Calls `GET /replication/replicas`. Only available on master
61    /// nodes; returns an error otherwise.
62    pub async fn list_replicas(&self) -> Result<Vec<ReplicaInfo>> {
63        let response = self
64            .make_request("GET", "/replication/replicas", None)
65            .await?;
66        let val: serde_json::Value = serde_json::from_str(&response).map_err(|e| {
67            VectorizerError::server(format!("Failed to parse list_replicas response: {e}"))
68        })?;
69        let arr = val
70            .get("replicas")
71            .and_then(|r| r.as_array())
72            .cloned()
73            .unwrap_or_default();
74        arr.into_iter()
75            .map(|v| {
76                serde_json::from_value(v).map_err(|e| {
77                    VectorizerError::server(format!("Failed to parse replica entry: {e}"))
78                })
79            })
80            .collect()
81    }
82
83    /// Trigger a failover — promote a replica to primary.
84    ///
85    /// Calls `POST /cluster/failover` with `{replica_id}`.
86    /// Returns 409 from the server when the replica's WAL lag exceeds the
87    /// configured threshold.
88    pub async fn cluster_failover(&self, replica_id: &str) -> Result<FailoverReport> {
89        let payload = serde_json::json!({ "replica_id": replica_id });
90        let response = self
91            .make_request("POST", "/cluster/failover", Some(payload))
92            .await?;
93        serde_json::from_str(&response).map_err(|e| {
94            VectorizerError::server(format!("Failed to parse cluster_failover response: {e}"))
95        })
96    }
97
98    /// Force a full resync on a replica.
99    ///
100    /// Calls `POST /cluster/replicas/{id}/resync` with an empty body.
101    pub async fn cluster_resync_replica(&self, replica_id: &str) -> Result<ResyncJob> {
102        let response = self
103            .make_request(
104                "POST",
105                &format!("/cluster/replicas/{replica_id}/resync"),
106                Some(serde_json::json!({})),
107            )
108            .await?;
109        serde_json::from_str(&response).map_err(|e| {
110            VectorizerError::server(format!(
111                "Failed to parse cluster_resync_replica response: {e}"
112            ))
113        })
114    }
115
116    /// Add a peer to the cluster.
117    ///
118    /// Calls `POST /cluster/peers` with `{address, role}`.
119    pub async fn cluster_add_peer(&self, request: AddPeerRequest) -> Result<PeerInfo> {
120        let payload = serde_json::to_value(&request).map_err(|e| {
121            VectorizerError::server(format!("Failed to serialize cluster_add_peer request: {e}"))
122        })?;
123        let response = self
124            .make_request("POST", "/cluster/peers", Some(payload))
125            .await?;
126        serde_json::from_str(&response).map_err(|e| {
127            VectorizerError::server(format!("Failed to parse cluster_add_peer response: {e}"))
128        })
129    }
130
131    /// Trigger a shard rebalance across all active cluster nodes.
132    ///
133    /// Calls `POST /cluster/rebalance` with an empty body.
134    /// Returns 400 when fewer than 2 active nodes are present, or 400 when
135    /// a rebalance is already in progress.
136    pub async fn cluster_rebalance(&self) -> Result<RebalanceJob> {
137        let response = self
138            .make_request("POST", "/cluster/rebalance", Some(serde_json::json!({})))
139            .await?;
140        serde_json::from_str(&response).map_err(|e| {
141            VectorizerError::server(format!("Failed to parse cluster_rebalance response: {e}"))
142        })
143    }
144
145    /// Return progress of the active (or last completed) rebalance job.
146    ///
147    /// Calls `GET /cluster/rebalance/status`.
148    /// Returns `None` when no rebalance has been triggered on this node.
149    pub async fn cluster_rebalance_status(&self) -> Result<Option<RebalanceJob>> {
150        let response = self
151            .make_request("GET", "/cluster/rebalance/status", None)
152            .await?;
153        let val: serde_json::Value = serde_json::from_str(&response).map_err(|e| {
154            VectorizerError::server(format!(
155                "Failed to parse cluster_rebalance_status response: {e}"
156            ))
157        })?;
158        // Server returns {status: "idle"} when no rebalance has been triggered.
159        if val.get("status").and_then(|s| s.as_str()) == Some("idle") {
160            return Ok(None);
161        }
162        serde_json::from_value(val).map(Some).map_err(|e| {
163            VectorizerError::server(format!(
164                "Failed to deserialize cluster_rebalance_status: {e}"
165            ))
166        })
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    #![allow(clippy::unwrap_used)]
173
174    use serde_json::json;
175
176    use crate::models::{ReplicationConfig, ReplicationStats, ReplicationStatus};
177
178    #[test]
179    fn replication_status_deserializes_standalone() {
180        let raw = json!({
181            "role": "Standalone",
182            "enabled": false
183        });
184        let rs: ReplicationStatus = serde_json::from_value(raw).unwrap();
185        assert_eq!(rs.role, "Standalone");
186        assert!(!rs.enabled);
187        assert!(rs.stats.is_none());
188        assert!(rs.replicas.is_none());
189    }
190
191    #[test]
192    fn replication_status_deserializes_master() {
193        let raw = json!({
194            "role": "Master",
195            "enabled": true,
196            "stats": {
197                "master_offset": 100,
198                "replica_offset": 95,
199                "lag_operations": 5,
200                "total_replicated": 500
201            },
202            "replicas": []
203        });
204        let rs: ReplicationStatus = serde_json::from_value(raw).unwrap();
205        assert_eq!(rs.role, "Master");
206        assert!(rs.enabled);
207        assert!(rs.stats.is_some());
208    }
209
210    #[test]
211    fn replication_config_serializes_master() {
212        let cfg = ReplicationConfig {
213            role: "master".into(),
214            bind_address: Some("0.0.0.0:15010".into()),
215            master_address: None,
216            heartbeat_interval: Some(1000),
217            log_size: None,
218        };
219        let v = serde_json::to_value(&cfg).unwrap();
220        assert_eq!(v["role"], "master");
221        assert_eq!(v["bind_address"], "0.0.0.0:15010");
222        assert_eq!(v["heartbeat_interval"], 1000);
223        assert!(v.get("master_address").is_none());
224    }
225
226    #[test]
227    fn replication_config_serializes_replica() {
228        let cfg = ReplicationConfig {
229            role: "replica".into(),
230            bind_address: None,
231            master_address: Some("master.host:15010".into()),
232            heartbeat_interval: None,
233            log_size: None,
234        };
235        let v = serde_json::to_value(&cfg).unwrap();
236        assert_eq!(v["role"], "replica");
237        assert_eq!(v["master_address"], "master.host:15010");
238    }
239
240    #[test]
241    fn replication_stats_round_trip() {
242        let raw = json!({
243            "master_offset": 200,
244            "replica_offset": 190,
245            "lag_operations": 10,
246            "total_replicated": 1000,
247            "bytes_sent": 4096u64,
248            "connected_replicas": 2
249        });
250        let stats: ReplicationStats = serde_json::from_value(raw).unwrap();
251        assert_eq!(stats.master_offset, 200);
252        assert_eq!(stats.lag_operations, 10);
253        assert_eq!(stats.bytes_sent, Some(4096));
254        assert_eq!(stats.connected_replicas, Some(2));
255    }
256
257    // ── phase15 cluster admin ─────────────────────────────────────────────────
258
259    use crate::models::{AddPeerRequest, FailoverReport, PeerInfo, RebalanceJob, ResyncJob};
260
261    #[test]
262    fn failover_report_round_trip() {
263        let raw = json!({
264            "promoted_replica_id": "replica-1",
265            "master_offset_at_promotion": 1000u64,
266            "replica_offset_at_promotion": 999u64,
267            "residual_lag_operations": 1u64
268        });
269        let r: FailoverReport = serde_json::from_value(raw).unwrap();
270        assert_eq!(r.promoted_replica_id, "replica-1");
271        assert_eq!(r.master_offset_at_promotion, 1000);
272        assert_eq!(r.residual_lag_operations, 1);
273    }
274
275    #[test]
276    fn resync_job_round_trip() {
277        let raw = json!({
278            "replica_id": "replica-2",
279            "snapshot_offset": 5000u64,
280            "full_snapshot": true
281        });
282        let j: ResyncJob = serde_json::from_value(raw).unwrap();
283        assert_eq!(j.replica_id, "replica-2");
284        assert_eq!(j.snapshot_offset, 5000);
285        assert!(j.full_snapshot);
286    }
287
288    #[test]
289    fn peer_info_round_trip() {
290        let raw = json!({
291            "node_id": "peer-abc",
292            "address": "10.0.0.2:15003",
293            "role": "member"
294        });
295        let p: PeerInfo = serde_json::from_value(raw).unwrap();
296        assert_eq!(p.node_id, "peer-abc");
297        assert_eq!(p.role, "member");
298    }
299
300    #[test]
301    fn add_peer_request_serializes() {
302        let req = AddPeerRequest {
303            address: "10.0.0.3:15003".into(),
304            role: "observer".into(),
305        };
306        let v = serde_json::to_value(&req).unwrap();
307        assert_eq!(v["address"], "10.0.0.3:15003");
308        assert_eq!(v["role"], "observer");
309    }
310
311    #[test]
312    fn rebalance_job_round_trip() {
313        let raw = json!({
314            "job_id": "job-xyz",
315            "status": "running",
316            "shards_to_move": 4usize,
317            "shards_moved": 1usize,
318            "message": "Rebalance started"
319        });
320        let j: RebalanceJob = serde_json::from_value(raw).unwrap();
321        assert_eq!(j.job_id, "job-xyz");
322        assert_eq!(j.status, "running");
323        assert_eq!(j.shards_to_move, 4);
324        assert!(j.last_checkpoint_node.is_none());
325    }
326}