Skip to main content

synth_ai/
environment_pools.rs

1use reqwest::header::{HeaderMap, HeaderValue};
2use serde_json::{Map, Value};
3use std::env;
4
5use synth_ai_core::http::HttpError;
6
7use crate::{Error, Result};
8
9/// Client for Synth Environment Pools rollouts.
10#[derive(Debug)]
11pub struct EnvironmentPoolsClient {
12    client: synth_ai_core::SynthClient,
13    api_version: Option<String>,
14}
15
16impl EnvironmentPoolsClient {
17    /// Create a new Environment Pools client.
18    pub fn new(api_key: impl Into<String>, base_url: Option<&str>) -> Result<Self> {
19        let api_key = api_key.into();
20        let client = synth_ai_core::SynthClient::new(&api_key, base_url).map_err(Error::Core)?;
21        Ok(Self {
22            client,
23            api_version: None,
24        })
25    }
26
27    /// Create a client from environment variables.
28    pub fn from_env() -> Result<Self> {
29        let api_key = env::var("SYNTH_API_KEY").map_err(|_| Error::MissingApiKey)?;
30        let base_url = env::var("SYNTH_BACKEND_URL").ok();
31        Self::new(api_key, base_url.as_deref())
32    }
33
34    /// Override the API version used for endpoints.
35    pub fn with_api_version(mut self, version: impl Into<String>) -> Self {
36        self.api_version = Some(version.into());
37        self
38    }
39
40    /// Access the underlying core client.
41    pub fn core(&self) -> &synth_ai_core::SynthClient {
42        &self.client
43    }
44
45    fn resolve_api_version(&self) -> String {
46        if let Some(version) = self.api_version.as_ref() {
47            if !version.trim().is_empty() {
48                return version.clone();
49            }
50        }
51        if let Ok(version) = env::var("ENV_POOLS_API_VERSION") {
52            if !version.trim().is_empty() {
53                return version;
54            }
55        }
56        "v1".to_string()
57    }
58
59    fn public_url(&self, suffix: &str) -> String {
60        let base = self.client.base_url().trim_end_matches('/');
61        format!("{base}/v1/{}", suffix.trim_start_matches('/'))
62    }
63
64    fn legacy_path(&self, suffix: &str) -> String {
65        format!(
66            "/api/v1/environment-pools/{}",
67            suffix.trim_start_matches('/')
68        )
69    }
70
71    fn idempotency_headers(idempotency_key: Option<&str>) -> Option<HeaderMap> {
72        let key = idempotency_key.filter(|value| !value.trim().is_empty())?;
73        let mut headers = HeaderMap::new();
74        if let Ok(value) = HeaderValue::from_str(key) {
75            headers.insert("Idempotency-Key", value);
76        }
77        Some(headers)
78    }
79
80    async fn post_json_with_fallback(
81        &self,
82        suffix: &str,
83        body: &Value,
84        idempotency_key: Option<&str>,
85    ) -> Result<Value> {
86        let version = self.resolve_api_version();
87        let public_url = self.public_url(suffix);
88        let legacy_path = self.legacy_path(suffix);
89        let headers = Self::idempotency_headers(idempotency_key);
90        let attempts = if version == "v1" {
91            vec![public_url, legacy_path]
92        } else {
93            vec![legacy_path, public_url]
94        };
95
96        let mut last_error: Option<HttpError> = None;
97        for path in attempts {
98            let response = self
99                .client
100                .http()
101                .post_json_with_headers::<Value>(&path, body, headers.clone())
102                .await;
103            match response {
104                Ok(value) => return Ok(value),
105                Err(err) => {
106                    if err.status() == Some(404) {
107                        last_error = Some(err);
108                        continue;
109                    }
110                    return Err(Error::Core(err.into()));
111                }
112            }
113        }
114
115        Err(Error::Core(
116            last_error
117                .unwrap_or_else(|| {
118                    HttpError::InvalidUrl("no env pools endpoints available".to_string())
119                })
120                .into(),
121        ))
122    }
123
124    async fn get_json_with_fallback(&self, suffix: &str) -> Result<Value> {
125        let version = self.resolve_api_version();
126        let public_url = self.public_url(suffix);
127        let legacy_path = self.legacy_path(suffix);
128        let attempts = if version == "v1" {
129            vec![public_url, legacy_path]
130        } else {
131            vec![legacy_path, public_url]
132        };
133
134        let mut last_error: Option<HttpError> = None;
135        for path in attempts {
136            let response = self.client.http().get_json(&path, None).await;
137            match response {
138                Ok(value) => return Ok(value),
139                Err(err) => {
140                    if err.status() == Some(404) {
141                        last_error = Some(err);
142                        continue;
143                    }
144                    return Err(Error::Core(err.into()));
145                }
146            }
147        }
148
149        Err(Error::Core(
150            last_error
151                .unwrap_or_else(|| {
152                    HttpError::InvalidUrl("no env pools endpoints available".to_string())
153                })
154                .into(),
155        ))
156    }
157
158    /// Create a rollout in Environment Pools.
159    pub async fn create_rollout(
160        &self,
161        mut request: Value,
162        idempotency_key: Option<&str>,
163        dry_run: Option<bool>,
164    ) -> Result<Value> {
165        if let Some(dry_run) = dry_run {
166            if let Value::Object(map) = &mut request {
167                map.insert("dry_run".to_string(), Value::Bool(dry_run));
168            }
169        }
170        self.post_json_with_fallback("rollouts", &request, idempotency_key)
171            .await
172    }
173
174    /// Create a batch of rollouts.
175    pub async fn create_rollouts_batch(
176        &self,
177        requests: Vec<Value>,
178        metadata: Option<Value>,
179        idempotency_key: Option<&str>,
180    ) -> Result<Value> {
181        let mut payload = Map::new();
182        payload.insert("requests".to_string(), Value::Array(requests));
183        if let Some(metadata) = metadata {
184            payload.insert("metadata".to_string(), metadata);
185        }
186        self.post_json_with_fallback("rollouts/batch", &Value::Object(payload), idempotency_key)
187            .await
188    }
189
190    /// Fetch rollout status/details.
191    pub async fn get_rollout(&self, rollout_id: &str) -> Result<Value> {
192        let suffix = format!("rollouts/{}", rollout_id);
193        self.get_json_with_fallback(&suffix).await
194    }
195}