database_replicator/remote/
client.rs

1// ABOUTME: HTTP client for communicating with remote execution API
2// ABOUTME: Handles job submission, status polling, and error handling
3
4use anyhow::{Context, Result};
5use reqwest::Client;
6use std::time::Duration;
7
8use super::models::{JobResponse, JobSpec, JobStatus};
9
10#[derive(Clone)]
11pub struct RemoteClient {
12    client: Client,
13    api_base_url: String,
14    api_key: Option<String>,
15}
16
17impl RemoteClient {
18    pub fn new(api_base_url: String, api_key: Option<String>) -> Result<Self> {
19        let client = Client::builder()
20            .timeout(Duration::from_secs(30))
21            .build()
22            .context("Failed to create HTTP client")?;
23
24        Ok(Self {
25            client,
26            api_base_url,
27            api_key,
28        })
29    }
30
31    pub async fn submit_job(&self, spec: &JobSpec) -> Result<JobResponse> {
32        let url = format!("{}/jobs", self.api_base_url);
33
34        let mut request = self.client.post(&url).json(spec);
35
36        // Add API key header if provided
37        if let Some(ref key) = self.api_key {
38            request = request.header("x-api-key", key);
39        }
40
41        let response = request
42            .send()
43            .await
44            .context("Failed to submit job to remote service. If the service is unavailable, you can use --local to run replication on your machine instead")?;
45
46        if !response.status().is_success() {
47            let status = response.status();
48            let body = response.text().await.unwrap_or_default();
49
50            // Special handling for authentication errors
51            if status == 401 {
52                anyhow::bail!(
53                    "Authentication failed. Your API key may be invalid or expired.\n\
54                    Generate a new key at: https://console.serendb.com/api-keys\n\
55                    Or use --local to run replication on your machine instead"
56                );
57            }
58
59            anyhow::bail!("Job submission failed with status {}: {}. If the remote service is unavailable, you can use --local to run replication on your machine instead", status, body);
60        }
61
62        let job_response: JobResponse = response
63            .json()
64            .await
65            .context("Failed to parse job response")?;
66
67        Ok(job_response)
68    }
69
70    pub async fn get_job_status(&self, job_id: &str) -> Result<JobStatus> {
71        let url = format!("{}/jobs/{}", self.api_base_url, job_id);
72
73        let mut request = self.client.get(&url);
74
75        // Add API key header if provided
76        if let Some(ref key) = self.api_key {
77            request = request.header("x-api-key", key);
78        }
79
80        let response = request.send().await.context(
81            "Failed to get job status from remote service. The remote service may be unavailable",
82        )?;
83
84        if !response.status().is_success() {
85            let status = response.status();
86            let body = response.text().await.unwrap_or_default();
87
88            // Special handling for authentication errors
89            if status == 401 {
90                anyhow::bail!(
91                    "Authentication failed. Your API key may be invalid or expired.\n\
92                    Generate a new key at: https://console.serendb.com/api-keys"
93                );
94            }
95
96            anyhow::bail!(
97                "Failed to get job status {}: {}. The remote service may be experiencing issues",
98                status,
99                body
100            );
101        }
102
103        let job_status: JobStatus = response
104            .json()
105            .await
106            .context("Failed to parse job status")?;
107
108        Ok(job_status)
109    }
110
111    pub async fn poll_until_complete(
112        &self,
113        job_id: &str,
114        callback: impl Fn(&JobStatus),
115    ) -> Result<JobStatus> {
116        loop {
117            let status = self.get_job_status(job_id).await?;
118            callback(&status);
119
120            match status.status.as_str() {
121                "completed" | "failed" => return Ok(status),
122                _ => {
123                    tokio::time::sleep(Duration::from_secs(5)).await;
124                }
125            }
126        }
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133
134    #[test]
135    fn test_client_creation() {
136        let client = RemoteClient::new("https://api.example.com".to_string(), None);
137        assert!(client.is_ok());
138    }
139
140    #[test]
141    fn test_client_creation_with_api_key() {
142        let client = RemoteClient::new(
143            "https://api.example.com".to_string(),
144            Some("test-key".to_string()),
145        );
146        assert!(client.is_ok());
147    }
148}