database_replicator/serendb/
client.rs

1// ABOUTME: HTTP client for SerenDB Console API
2// ABOUTME: Manages project settings like logical replication
3
4use anyhow::{Context, Result};
5use reqwest::Client;
6use serde::{Deserialize, Serialize};
7
8use crate::utils::replace_database_in_connection_string;
9
10/// Default SerenDB Console API base URL
11pub const DEFAULT_CONSOLE_API_URL: &str = "https://api.serendb.com";
12
13/// SerenDB Console API client
14pub struct ConsoleClient {
15    client: Client,
16    api_base_url: String,
17    api_key: String,
18}
19
20/// Project information from SerenDB Console API
21#[derive(Debug, Clone, Deserialize)]
22pub struct Project {
23    pub id: String,
24    pub name: String,
25    pub enable_logical_replication: bool,
26    #[serde(default)]
27    pub organization_id: Option<String>,
28}
29
30/// Branch information from SerenDB Console API
31#[allow(dead_code)]
32#[derive(Debug, Clone, Deserialize)]
33pub struct Branch {
34    pub id: String,
35    pub name: String,
36    pub project_id: String,
37    #[serde(default)]
38    pub is_default: bool,
39}
40
41/// Database information from SerenDB Console API
42#[allow(dead_code)]
43#[derive(Debug, Clone, Deserialize)]
44pub struct Database {
45    pub id: String,
46    pub name: String,
47    pub branch_id: String,
48}
49
50/// Connection string response payload
51#[allow(dead_code)]
52#[derive(Debug, Deserialize)]
53pub struct ConnectionStringResponse {
54    pub connection_string: String,
55}
56
57/// Request payload to create a database
58#[allow(dead_code)]
59#[derive(Debug, Serialize)]
60pub struct CreateDatabaseRequest {
61    pub name: String,
62}
63
64/// Paginated response wrapper from the Console API
65#[allow(dead_code)]
66#[derive(Debug, Deserialize)]
67pub struct PaginatedResponse<T> {
68    pub data: Vec<T>,
69    #[serde(default)]
70    pub pagination: Option<Pagination>,
71}
72
73/// Pagination metadata returned by the Console API
74#[allow(dead_code)]
75#[derive(Debug, Deserialize, Default)]
76pub struct Pagination {
77    #[serde(default)]
78    pub total: i64,
79    #[serde(default)]
80    pub page: i64,
81    #[serde(default)]
82    pub per_page: i64,
83}
84
85/// Wrapper for API responses
86#[derive(Debug, Deserialize)]
87pub struct DataResponse<T> {
88    pub data: T,
89}
90
91/// Request to update project settings
92#[derive(Debug, Serialize)]
93pub struct UpdateProjectRequest {
94    #[serde(skip_serializing_if = "Option::is_none")]
95    pub enable_logical_replication: Option<bool>,
96}
97
98impl ConsoleClient {
99    /// Create a new Console API client
100    ///
101    /// # Arguments
102    ///
103    /// * `api_base_url` - Optional base URL (defaults to https://api.serendb.com)
104    /// * `api_key` - SerenDB API key (format: seren_<key_id>_<secret>)
105    pub fn new(api_base_url: Option<&str>, api_key: String) -> Self {
106        Self {
107            client: Client::new(),
108            api_base_url: api_base_url
109                .unwrap_or(DEFAULT_CONSOLE_API_URL)
110                .trim_end_matches('/')
111                .to_string(),
112            api_key,
113        }
114    }
115
116    /// List all projects accessible to the authenticated user
117    ///
118    /// # Returns
119    ///
120    /// Vector of projects the user has access to
121    ///
122    /// # Examples
123    /// ```ignore
124    /// let client = ConsoleClient::new(None, "seren_key".to_string());
125    /// let projects = client.list_projects().await?;
126    /// for project in projects {
127    ///     println!("{}: {}", project.id, project.name);
128    /// }
129    /// ```
130    pub async fn list_projects(&self) -> Result<Vec<Project>> {
131        let url = format!("{}/api/projects", self.api_base_url);
132
133        let response = self
134            .client
135            .get(&url)
136            .header("Authorization", format!("Bearer {}", self.api_key))
137            .header("Content-Type", "application/json")
138            .send()
139            .await
140            .context("Failed to send request to SerenDB Console API")?;
141
142        self.handle_common_errors(&response).await?;
143
144        if !response.status().is_success() {
145            let status = response.status();
146            let body = response.text().await.unwrap_or_default();
147            anyhow::bail!("SerenDB Console API returned error {}: {}", status, body);
148        }
149
150        let data: PaginatedResponse<Project> = response
151            .json()
152            .await
153            .context("Failed to parse projects response from SerenDB Console API")?;
154
155        Ok(data.data)
156    }
157
158    /// List all branches for a project
159    pub async fn list_branches(&self, project_id: &str) -> Result<Vec<Branch>> {
160        let url = format!("{}/api/projects/{}/branches", self.api_base_url, project_id);
161
162        let response = self
163            .client
164            .get(&url)
165            .header("Authorization", format!("Bearer {}", self.api_key))
166            .header("Content-Type", "application/json")
167            .send()
168            .await
169            .context("Failed to send request to SerenDB Console API")?;
170
171        self.handle_common_errors(&response).await?;
172
173        if !response.status().is_success() {
174            let status = response.status();
175            let body = response.text().await.unwrap_or_default();
176            anyhow::bail!("SerenDB Console API returned error {}: {}", status, body);
177        }
178
179        let data: PaginatedResponse<Branch> = response
180            .json()
181            .await
182            .context("Failed to parse branches response from SerenDB Console API")?;
183
184        Ok(data.data)
185    }
186
187    /// Get the default branch for a project
188    ///
189    /// Returns the branch marked as default, or the first branch if none are marked.
190    pub async fn get_default_branch(&self, project_id: &str) -> Result<Branch> {
191        let branches = self.list_branches(project_id).await?;
192        select_default_branch(project_id, branches)
193    }
194
195    /// List all databases within a SerenDB branch
196    pub async fn list_databases(&self, project_id: &str, branch_id: &str) -> Result<Vec<Database>> {
197        let url = format!(
198            "{}/api/projects/{}/branches/{}/databases",
199            self.api_base_url, project_id, branch_id
200        );
201
202        let response = self
203            .client
204            .get(&url)
205            .header("Authorization", format!("Bearer {}", self.api_key))
206            .header("Content-Type", "application/json")
207            .send()
208            .await
209            .context("Failed to send request to SerenDB Console API")?;
210
211        self.handle_common_errors(&response).await?;
212
213        if !response.status().is_success() {
214            let status = response.status();
215            let body = response.text().await.unwrap_or_default();
216            anyhow::bail!("SerenDB Console API returned error {}: {}", status, body);
217        }
218
219        let data: PaginatedResponse<Database> = response
220            .json()
221            .await
222            .context("Failed to parse databases response from SerenDB Console API")?;
223
224        Ok(data.data)
225    }
226
227    /// Create a new SerenDB database inside a branch
228    pub async fn create_database(
229        &self,
230        project_id: &str,
231        branch_id: &str,
232        name: &str,
233    ) -> Result<Database> {
234        let url = format!(
235            "{}/api/projects/{}/branches/{}/databases",
236            self.api_base_url, project_id, branch_id
237        );
238
239        let request = CreateDatabaseRequest {
240            name: name.to_string(),
241        };
242
243        let response = self
244            .client
245            .post(&url)
246            .header("Authorization", format!("Bearer {}", self.api_key))
247            .header("Content-Type", "application/json")
248            .json(&request)
249            .send()
250            .await
251            .context("Failed to send request to SerenDB Console API")?;
252
253        self.handle_common_errors(&response).await?;
254
255        if !response.status().is_success() {
256            let status = response.status();
257            let body = response.text().await.unwrap_or_default();
258            anyhow::bail!(
259                "Failed to create database '{}': {} - {}",
260                name,
261                status,
262                body
263            );
264        }
265
266        let data: DataResponse<Database> = response
267            .json()
268            .await
269            .context("Failed to parse create database response from SerenDB Console API")?;
270
271        Ok(data.data)
272    }
273
274    /// Get a connection string for a branch/database combination
275    pub async fn get_connection_string(
276        &self,
277        project_id: &str,
278        branch_id: &str,
279        database: &str,
280        pooled: bool,
281    ) -> Result<String> {
282        let url = format!(
283            "{}/api/projects/{}/branches/{}/connection-string?pooled={}",
284            self.api_base_url, project_id, branch_id, pooled
285        );
286
287        let response = self
288            .client
289            .get(&url)
290            .header("Authorization", format!("Bearer {}", self.api_key))
291            .header("Content-Type", "application/json")
292            .send()
293            .await
294            .context("Failed to send request to SerenDB Console API")?;
295
296        self.handle_common_errors_with_context(
297            &response,
298            Some("Branch has no compute endpoint. Select a different branch or create an endpoint at console.serendb.com.".to_string()),
299        )
300        .await?;
301
302        if !response.status().is_success() {
303            let status = response.status();
304            let body = response.text().await.unwrap_or_default();
305            anyhow::bail!("SerenDB Console API returned error {}: {}", status, body);
306        }
307
308        let data: DataResponse<ConnectionStringResponse> = response
309            .json()
310            .await
311            .context("Failed to parse connection string response from SerenDB Console API")?;
312
313        replace_database_in_connection_string(&data.data.connection_string, database)
314    }
315
316    /// Get project information by ID
317    ///
318    /// # Arguments
319    ///
320    /// * `project_id` - UUID string of the project
321    ///
322    /// # Returns
323    ///
324    /// Project information including logical replication status
325    pub async fn get_project(&self, project_id: &str) -> Result<Project> {
326        let url = format!("{}/api/projects/{}", self.api_base_url, project_id);
327
328        let response = self
329            .client
330            .get(&url)
331            .header("Authorization", format!("Bearer {}", self.api_key))
332            .header("Content-Type", "application/json")
333            .send()
334            .await
335            .context("Failed to send request to SerenDB Console API")?;
336
337        self.handle_common_errors_with_context(
338            &response,
339            Some(format!(
340                "Project {} not found.\n\
341                 Verify the project ID is correct and you have access to it.",
342                project_id
343            )),
344        )
345        .await?;
346
347        if !response.status().is_success() {
348            let status = response.status();
349            let body = response.text().await.unwrap_or_default();
350            anyhow::bail!("SerenDB Console API returned error {}: {}", status, body);
351        }
352
353        let data: DataResponse<Project> = response
354            .json()
355            .await
356            .context("Failed to parse project response from SerenDB Console API")?;
357
358        Ok(data.data)
359    }
360
361    /// Enable logical replication for a project
362    ///
363    /// **Warning**: This action cannot be undone. Once enabled, logical replication
364    /// cannot be disabled. Enabling will briefly suspend all active endpoints.
365    ///
366    /// # Arguments
367    ///
368    /// * `project_id` - UUID string of the project
369    ///
370    /// # Returns
371    ///
372    /// Updated project information
373    pub async fn enable_logical_replication(&self, project_id: &str) -> Result<Project> {
374        let url = format!("{}/api/projects/{}", self.api_base_url, project_id);
375
376        let request = UpdateProjectRequest {
377            enable_logical_replication: Some(true),
378        };
379
380        let response = self
381            .client
382            .patch(&url)
383            .header("Authorization", format!("Bearer {}", self.api_key))
384            .header("Content-Type", "application/json")
385            .json(&request)
386            .send()
387            .await
388            .context("Failed to send request to SerenDB Console API")?;
389
390        self.handle_common_errors_with_context(
391            &response,
392            Some(format!(
393                "Project {} not found.\n\
394                 Verify the project ID is correct and you have access to it.",
395                project_id
396            )),
397        )
398        .await?;
399
400        if !response.status().is_success() {
401            let status = response.status();
402            let body = response.text().await.unwrap_or_default();
403            anyhow::bail!(
404                "Failed to enable logical replication. SerenDB Console API returned {}: {}",
405                status,
406                body
407            );
408        }
409
410        let data: DataResponse<Project> = response
411            .json()
412            .await
413            .context("Failed to parse project response from SerenDB Console API")?;
414
415        Ok(data.data)
416    }
417
418    /// Check if logical replication is enabled for a project
419    ///
420    /// # Arguments
421    ///
422    /// * `project_id` - UUID string of the project
423    ///
424    /// # Returns
425    ///
426    /// true if logical replication is enabled, false otherwise
427    pub async fn is_logical_replication_enabled(&self, project_id: &str) -> Result<bool> {
428        let project = self.get_project(project_id).await?;
429        Ok(project.enable_logical_replication)
430    }
431
432    /// Find a project by matching the target hostname against project connection strings
433    ///
434    /// This is useful when the user provides a direct connection string (e.g., from
435    /// `init` output) but doesn't have saved target state with the project_id.
436    ///
437    /// # Arguments
438    ///
439    /// * `target_hostname` - The hostname to search for (e.g., "ep-xyz.serendb.com")
440    ///
441    /// # Returns
442    ///
443    /// The project_id if a matching project is found, None otherwise
444    pub async fn find_project_by_hostname(&self, target_hostname: &str) -> Result<Option<String>> {
445        let target_host_lower = target_hostname.to_lowercase();
446
447        // List all projects accessible to this API key
448        let projects = self.list_projects().await?;
449
450        for project in projects {
451            // Get the default branch for each project
452            let branch = match self.get_default_branch(&project.id).await {
453                Ok(b) => b,
454                Err(_) => continue, // Skip projects without branches
455            };
456
457            // Get connection string for this branch (use serendb as dummy database)
458            let conn_str = match self
459                .get_connection_string(&project.id, &branch.id, "serendb", false)
460                .await
461            {
462                Ok(s) => s,
463                Err(_) => continue, // Skip branches without endpoints
464            };
465
466            // Extract hostname from the connection string
467            if let Ok(parts) = crate::utils::parse_postgres_url(&conn_str) {
468                if parts.host.to_lowercase() == target_host_lower {
469                    tracing::info!(
470                        "Found matching project '{}' ({}) for hostname {}",
471                        project.name,
472                        project.id,
473                        target_hostname
474                    );
475                    return Ok(Some(project.id));
476                }
477            }
478        }
479
480        tracing::debug!("No project found matching hostname {}", target_hostname);
481        Ok(None)
482    }
483
484    async fn handle_common_errors(&self, response: &reqwest::Response) -> Result<()> {
485        self.handle_common_errors_with_context(response, None).await
486    }
487
488    async fn handle_common_errors_with_context(
489        &self,
490        response: &reqwest::Response,
491        not_found_message: Option<String>,
492    ) -> Result<()> {
493        if response.status() == reqwest::StatusCode::UNAUTHORIZED {
494            anyhow::bail!(
495                "SerenDB API key is invalid or expired.\n\
496                 Generate a new key at: https://console.serendb.com/api-keys"
497            );
498        }
499
500        if response.status() == reqwest::StatusCode::NOT_FOUND {
501            if let Some(message) = not_found_message {
502                anyhow::bail!(message);
503            } else {
504                anyhow::bail!("Resource not found. Verify the ID is correct and you have access.");
505            }
506        }
507
508        Ok(())
509    }
510}
511
512fn select_default_branch(project_id: &str, branches: Vec<Branch>) -> Result<Branch> {
513    if branches.is_empty() {
514        anyhow::bail!("Project {} has no branches", project_id);
515    }
516
517    if let Some(default_branch) = branches.iter().find(|branch| branch.is_default) {
518        return Ok(default_branch.clone());
519    }
520
521    Ok(branches.into_iter().next().expect("branches is not empty"))
522}
523
524#[cfg(test)]
525mod tests {
526    use super::*;
527
528    #[test]
529    fn test_client_creation() {
530        let client = ConsoleClient::new(None, "seren_test_key".to_string());
531        assert_eq!(client.api_base_url, DEFAULT_CONSOLE_API_URL);
532    }
533
534    #[test]
535    fn test_client_custom_url() {
536        let client = ConsoleClient::new(
537            Some("https://custom.serendb.com/"),
538            "seren_test_key".to_string(),
539        );
540        assert_eq!(client.api_base_url, "https://custom.serendb.com");
541    }
542
543    #[test]
544    fn test_update_request_serialization() {
545        let request = UpdateProjectRequest {
546            enable_logical_replication: Some(true),
547        };
548        let json = serde_json::to_string(&request).unwrap();
549        assert!(json.contains("enable_logical_replication"));
550        assert!(json.contains("true"));
551    }
552
553    #[test]
554    fn test_branch_deserialization() {
555        let json = r#"{"id": "abc", "name": "main", "project_id": "xyz", "is_default": true}"#;
556        let branch: Branch = serde_json::from_str(json).unwrap();
557        assert_eq!(branch.name, "main");
558        assert!(branch.is_default);
559    }
560
561    #[test]
562    fn test_database_deserialization() {
563        let json = r#"{"id": "db1", "name": "myapp", "branch_id": "br1"}"#;
564        let db: Database = serde_json::from_str(json).unwrap();
565        assert_eq!(db.name, "myapp");
566        assert_eq!(db.branch_id, "br1");
567    }
568
569    #[test]
570    fn test_select_default_branch_prefers_flagged_branch() {
571        let branches = vec![
572            Branch {
573                id: "br1".into(),
574                name: "preview".into(),
575                project_id: "proj".into(),
576                is_default: false,
577            },
578            Branch {
579                id: "br2".into(),
580                name: "main".into(),
581                project_id: "proj".into(),
582                is_default: true,
583            },
584        ];
585
586        let default = select_default_branch("proj", branches).unwrap();
587        assert_eq!(default.id, "br2");
588        assert_eq!(default.name, "main");
589    }
590
591    #[test]
592    fn test_select_default_branch_falls_back_to_first() {
593        let branches = vec![
594            Branch {
595                id: "br1".into(),
596                name: "alpha".into(),
597                project_id: "proj".into(),
598                is_default: false,
599            },
600            Branch {
601                id: "br2".into(),
602                name: "beta".into(),
603                project_id: "proj".into(),
604                is_default: false,
605            },
606        ];
607
608        let default = select_default_branch("proj", branches).unwrap();
609        assert_eq!(default.id, "br1");
610        assert_eq!(default.name, "alpha");
611    }
612
613    #[test]
614    fn test_select_default_branch_errors_when_empty() {
615        let err = select_default_branch("proj", Vec::new()).unwrap_err();
616        assert!(format!("{err}").contains("has no branches"));
617    }
618
619    #[test]
620    fn test_replace_database_in_connection_string() {
621        let original =
622            "postgresql://user:pass@host.serendb.com:5432/serendb?sslmode=require&foo=bar";
623        let updated =
624            replace_database_in_connection_string(original, "myapp").expect("replace succeeds");
625        assert!(updated.contains("/myapp?"));
626        assert!(updated.starts_with("postgresql://user:pass@host.serendb.com:5432/"));
627        assert!(updated.ends_with("sslmode=require&foo=bar"));
628    }
629}