Skip to main content

deslicer_cli/
observer_client.rs

1use crate::errors::CliError;
2use reqwest::Method;
3use serde::{Deserialize, Serialize};
4use std::time::Duration;
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
7pub struct ChangePlan {
8    pub id: String,
9    #[serde(default)]
10    pub status: String,
11    #[serde(default)]
12    pub summary: Option<String>,
13}
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct PlanProgress {
17    pub id: String,
18    #[serde(default)]
19    pub status: String,
20}
21
22#[derive(Debug, Clone, Copy)]
23pub enum ReconcileMode {
24    PlanOnly,
25    Apply,
26}
27
28impl ReconcileMode {
29    fn as_str(self) -> &'static str {
30        match self {
31            ReconcileMode::PlanOnly => "plan-only",
32            ReconcileMode::Apply => "apply",
33        }
34    }
35}
36
37pub struct Client {
38    base: url::Url,
39    token: String,
40    http: reqwest::Client,
41}
42
43impl Client {
44    pub fn new(base: url::Url, token: String) -> Self {
45        Self {
46            base,
47            token,
48            http: reqwest::Client::new(),
49        }
50    }
51
52    pub async fn reconcile(
53        &self,
54        environment: &Option<String>,
55        mode: ReconcileMode,
56    ) -> Result<ChangePlan, CliError> {
57        #[derive(Serialize)]
58        struct ReconcileBody<'a> {
59            #[serde(skip_serializing_if = "Option::is_none")]
60            environment: Option<&'a str>,
61            mode: &'static str,
62        }
63
64        let body = ReconcileBody {
65            environment: environment.as_deref(),
66            mode: mode.as_str(),
67        };
68        self.post_json("api/v1/state/reconcile", &body).await
69    }
70
71    pub async fn get_plan(&self, plan_id: &str) -> Result<ChangePlan, CliError> {
72        let path = format!("api/v1/plans/{plan_id}");
73        self.get_json(&path).await
74    }
75
76    pub async fn list_plans(&self, environment: Option<&str>) -> Result<Vec<ChangePlan>, CliError> {
77        let mut path = "api/v1/plans".to_string();
78        if let Some(env) = environment {
79            let encoded = url::form_urlencoded::byte_serialize(env.as_bytes()).collect::<String>();
80            path.push_str(&format!("?environment={encoded}"));
81        }
82        self.get_plans(&path).await
83    }
84
85    pub async fn approve(&self, plan_id: &str) -> Result<ChangePlan, CliError> {
86        let path = format!("api/v1/plans/{plan_id}/approve");
87        self.post_json_empty(&path).await
88    }
89
90    pub async fn reject(&self, plan_id: &str) -> Result<ChangePlan, CliError> {
91        let path = format!("api/v1/plans/{plan_id}/reject");
92        self.post_json_empty(&path).await
93    }
94
95    pub async fn execute(&self, plan_id: &str) -> Result<ChangePlan, CliError> {
96        let path = format!("api/v1/plans/{plan_id}/execute");
97        self.post_json_empty(&path).await
98    }
99
100    pub async fn progress(&self, plan_id: &str) -> Result<PlanProgress, CliError> {
101        let path = format!("api/v1/plans/{plan_id}/progress");
102        self.get_json(&path).await
103    }
104
105    async fn post_json_empty(&self, path: &str) -> Result<ChangePlan, CliError> {
106        self.request_json(Method::POST, path, None::<&()>).await
107    }
108
109    async fn post_json<T: Serialize + ?Sized>(
110        &self,
111        path: &str,
112        body: &T,
113    ) -> Result<ChangePlan, CliError> {
114        self.request_json(Method::POST, path, Some(body)).await
115    }
116
117    async fn get_json<T: for<'de> Deserialize<'de>>(&self, path: &str) -> Result<T, CliError> {
118        self.request_json(Method::GET, path, None::<&()>).await
119    }
120
121    async fn get_plans(&self, path: &str) -> Result<Vec<ChangePlan>, CliError> {
122        let bytes = self.request_bytes(Method::GET, path, None::<&()>).await?;
123        if let Ok(plans) = serde_json::from_slice::<Vec<ChangePlan>>(&bytes) {
124            return Ok(plans);
125        }
126        #[derive(Deserialize)]
127        struct PlansWrapper {
128            plans: Vec<ChangePlan>,
129        }
130        serde_json::from_slice::<PlansWrapper>(&bytes)
131            .map(|w| w.plans)
132            .map_err(|e| CliError::Transport(format!("invalid plans JSON: {e}")))
133    }
134
135    async fn request_json<T, B>(
136        &self,
137        method: Method,
138        path: &str,
139        body: Option<&B>,
140    ) -> Result<T, CliError>
141    where
142        T: for<'de> Deserialize<'de>,
143        B: Serialize + ?Sized,
144    {
145        let bytes = self.request_bytes(method, path, body).await?;
146        serde_json::from_slice(&bytes)
147            .map_err(|e| CliError::Transport(format!("invalid JSON response: {e}")))
148    }
149
150    async fn request_bytes<B>(
151        &self,
152        method: Method,
153        path: &str,
154        body: Option<&B>,
155    ) -> Result<Vec<u8>, CliError>
156    where
157        B: Serialize + ?Sized,
158    {
159        const MAX_ATTEMPTS: u32 = 3;
160        const BACKOFF_BASE_MS: u64 = 500;
161
162        let url = join_api_path(&self.base, path)?;
163        let mut attempt = 0u32;
164
165        loop {
166            attempt += 1;
167            let mut req = self
168                .http
169                .request(method.clone(), url.clone())
170                .header("Authorization", format!("Bearer {}", self.token));
171            if let Some(payload) = body {
172                req = req.json(payload);
173            }
174
175            let response = req
176                .send()
177                .await
178                .map_err(|e| CliError::Transport(e.to_string()))?;
179
180            let status = response.status();
181            if (status.is_server_error() || status == reqwest::StatusCode::TOO_MANY_REQUESTS)
182                && attempt < MAX_ATTEMPTS
183            {
184                let delay = retry_delay(response.headers(), attempt, BACKOFF_BASE_MS);
185                tokio::time::sleep(delay).await;
186                continue;
187            }
188
189            let retry_after = parse_retry_after_header(response.headers());
190            let bytes = response
191                .bytes()
192                .await
193                .map_err(|e| CliError::Transport(e.to_string()))?;
194
195            if status.is_success() {
196                return Ok(bytes.to_vec());
197            }
198
199            let body_text = String::from_utf8_lossy(&bytes).into_owned();
200            return Err(map_observer_error(status, &body_text, retry_after));
201        }
202    }
203}
204
205fn join_api_path(base: &url::Url, path: &str) -> Result<url::Url, CliError> {
206    base.join(path)
207        .map_err(|e| CliError::Transport(format!("invalid URL join: {e}")))
208}
209
210fn retry_delay(headers: &reqwest::header::HeaderMap, attempt: u32, base_ms: u64) -> Duration {
211    if let Some(secs) = parse_retry_after_header(headers) {
212        return Duration::from_secs(secs);
213    }
214    let multiplier = 1u64.checked_shl(attempt.saturating_sub(1)).unwrap_or(1);
215    Duration::from_millis(base_ms.saturating_mul(multiplier))
216}
217
218fn map_observer_error(
219    status: reqwest::StatusCode,
220    body: &str,
221    retry_after_secs: Option<u64>,
222) -> CliError {
223    let message = error_message(body, status);
224    match status.as_u16() {
225        400 => CliError::UnsupportedPlatform(message),
226        401 => CliError::OidcRejected(message),
227        403 => {
228            if mentions_environment(body) {
229                CliError::EnvironmentNotBound(message)
230            } else {
231                CliError::RepoNotAllowlisted(message)
232            }
233        }
234        404 => CliError::PlanNotFound(message),
235        409 => CliError::AmbiguousBinding(message),
236        429 => CliError::RateLimited {
237            retry_after_secs: retry_after_secs.unwrap_or(30),
238        },
239        500..=599 => CliError::BackendUnavailable(status.to_string()),
240        _ => CliError::Other(message),
241    }
242}
243
244fn error_message(body: &str, status: reqwest::StatusCode) -> String {
245    if let Ok(value) = serde_json::from_str::<serde_json::Value>(body) {
246        for key in ["detail", "error", "message"] {
247            if let Some(text) = value.get(key).and_then(|v| v.as_str()) {
248                if !text.is_empty() {
249                    return text.to_string();
250                }
251            }
252        }
253    }
254    if body.trim().is_empty() {
255        format!("HTTP {status}")
256    } else {
257        body.trim().to_string()
258    }
259}
260
261fn mentions_environment(text: &str) -> bool {
262    text.to_ascii_lowercase().contains("environment")
263}
264
265fn parse_retry_after_header(headers: &reqwest::header::HeaderMap) -> Option<u64> {
266    headers
267        .get(reqwest::header::RETRY_AFTER)
268        .and_then(|v| v.to_str().ok())
269        .and_then(|s| s.trim().parse::<u64>().ok())
270}