database_replicator/remote/
client.rs

1// ABOUTME: HTTP client for communicating with remote execution API
2// ABOUTME: Handles job submission, status polling, and error handling
3
4use anyhow::{Context, Result};
5use reqwest::Client;
6use std::time::Duration;
7
8use super::models::{JobResponse, JobSpec, JobStatus};
9
10pub struct RemoteClient {
11    client: Client,
12    api_base_url: String,
13    api_key: Option<String>,
14}
15
16impl RemoteClient {
17    pub fn new(api_base_url: String, api_key: Option<String>) -> Result<Self> {
18        let client = Client::builder()
19            .timeout(Duration::from_secs(30))
20            .build()
21            .context("Failed to create HTTP client")?;
22
23        Ok(Self {
24            client,
25            api_base_url,
26            api_key,
27        })
28    }
29
30    pub async fn submit_job(&self, spec: &JobSpec) -> Result<JobResponse> {
31        let url = format!("{}/jobs", self.api_base_url);
32
33        let mut request = self.client.post(&url).json(spec);
34
35        // Add API key header if provided
36        if let Some(ref key) = self.api_key {
37            request = request.header("x-api-key", key);
38        }
39
40        let response = request
41            .send()
42            .await
43            .context("Failed to submit job to remote service. If the service is unavailable, you can use --local to run replication on your machine instead")?;
44
45        if !response.status().is_success() {
46            let status = response.status();
47            let body = response.text().await.unwrap_or_default();
48
49            // Special handling for authentication errors
50            if status == 401 {
51                anyhow::bail!(
52                    "Authentication failed. Your API key may be invalid or expired.\n\
53                    Generate a new key at: https://console.serendb.com/api-keys\n\
54                    Or use --local to run replication on your machine instead"
55                );
56            }
57
58            anyhow::bail!("Job submission failed with status {}: {}. If the remote service is unavailable, you can use --local to run replication on your machine instead", status, body);
59        }
60
61        let job_response: JobResponse = response
62            .json()
63            .await
64            .context("Failed to parse job response")?;
65
66        Ok(job_response)
67    }
68
69    pub async fn get_job_status(&self, job_id: &str) -> Result<JobStatus> {
70        let url = format!("{}/jobs/{}", self.api_base_url, job_id);
71
72        let mut request = self.client.get(&url);
73
74        // Add API key header if provided
75        if let Some(ref key) = self.api_key {
76            request = request.header("x-api-key", key);
77        }
78
79        let response = request.send().await.context(
80            "Failed to get job status from remote service. The remote service may be unavailable",
81        )?;
82
83        if !response.status().is_success() {
84            let status = response.status();
85            let body = response.text().await.unwrap_or_default();
86
87            // Special handling for authentication errors
88            if status == 401 {
89                anyhow::bail!(
90                    "Authentication failed. Your API key may be invalid or expired.\n\
91                    Generate a new key at: https://console.serendb.com/api-keys"
92                );
93            }
94
95            anyhow::bail!(
96                "Failed to get job status {}: {}. The remote service may be experiencing issues",
97                status,
98                body
99            );
100        }
101
102        let job_status: JobStatus = response
103            .json()
104            .await
105            .context("Failed to parse job status")?;
106
107        Ok(job_status)
108    }
109
110    pub async fn poll_until_complete(
111        &self,
112        job_id: &str,
113        callback: impl Fn(&JobStatus),
114    ) -> Result<JobStatus> {
115        loop {
116            let status = self.get_job_status(job_id).await?;
117            callback(&status);
118
119            match status.status.as_str() {
120                "completed" | "failed" => return Ok(status),
121                _ => {
122                    tokio::time::sleep(Duration::from_secs(5)).await;
123                }
124            }
125        }
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132
133    #[test]
134    fn test_client_creation() {
135        let client = RemoteClient::new("https://api.example.com".to_string(), None);
136        assert!(client.is_ok());
137    }
138
139    #[test]
140    fn test_client_creation_with_api_key() {
141        let client = RemoteClient::new(
142            "https://api.example.com".to_string(),
143            Some("test-key".to_string()),
144        );
145        assert!(client.is_ok());
146    }
147}