claw_spawn/infrastructure/
digital_ocean.rs1use crate::domain::{Droplet, DropletCreateRequest};
2use reqwest::{header, Client};
3use serde_json::json;
4use std::time::Duration;
5use thiserror::Error;
6use tokio::time::sleep;
7
8#[derive(Error, Debug)]
9pub enum DigitalOceanError {
10 #[error("API request failed: {0}")]
11 RequestFailed(String),
12 #[error("Droplet creation failed: {0}")]
13 CreationFailed(String),
14 #[error("Droplet not found: {0}")]
15 NotFound(i64),
16 #[error("Rate limited")]
17 RateLimited,
18 #[error("Invalid response: {0}")]
19 InvalidResponse(String),
20 #[error("Invalid configuration: {0}")]
21 InvalidConfig(String),
22 #[error("Max retries exceeded for DO API call")]
23 MaxRetriesExceeded,
24}
25
26const MAX_RETRIES: u32 = 3;
28const INITIAL_BACKOFF_MS: u64 = 1000;
29
30fn is_retryable_status(status: u16) -> bool {
32 matches!(status, 500 | 502 | 503)
33}
34
35pub struct DigitalOceanClient {
36 client: Client,
37 base_url: String,
38}
39
40impl DigitalOceanClient {
41 pub fn new(api_token: String) -> Result<Self, DigitalOceanError> {
42 let mut headers = header::HeaderMap::new();
43 let auth_value = match header::HeaderValue::from_str(&format!("Bearer {}", api_token)) {
44 Ok(val) => val,
45 Err(e) => {
46 return Err(DigitalOceanError::InvalidConfig(format!(
47 "Invalid API token format: {}",
48 e
49 )))
50 }
51 };
52 headers.insert(header::AUTHORIZATION, auth_value);
53 headers.insert(
54 header::CONTENT_TYPE,
55 header::HeaderValue::from_static("application/json"),
56 );
57
58 let client = Client::builder()
59 .default_headers(headers)
60 .timeout(Duration::from_secs(30))
62 .connect_timeout(Duration::from_secs(10))
63 .pool_idle_timeout(Duration::from_secs(90))
64 .build()
65 .map_err(|e| {
66 DigitalOceanError::InvalidConfig(format!("Failed to create HTTP client: {}", e))
67 })?;
68
69 Ok(Self {
70 client,
71 base_url: "https://api.digitalocean.com/v2".to_string(),
72 })
73 }
74
75 pub async fn create_droplet(
76 &self,
77 request: DropletCreateRequest,
78 ) -> Result<Droplet, DigitalOceanError> {
79 let body = json!({
80 "name": request.name,
81 "region": request.region,
82 "size": request.size,
83 "image": request.image,
84 "user_data": request.user_data,
85 "tags": request.tags,
86 "monitoring": true,
87 "ipv6": false,
88 "backups": false,
89 });
90
91 let mut last_error = None;
92 for attempt in 0..MAX_RETRIES {
93 let response = self
94 .client
95 .post(format!("{}/droplets", self.base_url))
96 .json(&body)
97 .send()
98 .await;
99
100 match response {
101 Ok(resp) => {
102 let status = resp.status().as_u16();
103
104 if status == 429 {
105 return Err(DigitalOceanError::RateLimited);
106 }
107
108 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
110 let backoff = INITIAL_BACKOFF_MS * 2_u64.pow(attempt);
111 sleep(Duration::from_millis(backoff)).await;
112 continue;
113 }
114
115 if !resp.status().is_success() {
116 let error_text = resp
117 .text()
118 .await
119 .unwrap_or_else(|_| "Unknown error".to_string());
120 return Err(DigitalOceanError::CreationFailed(error_text));
121 }
122
123 let json_response: serde_json::Value = resp
124 .json()
125 .await
126 .map_err(|e| DigitalOceanError::InvalidResponse(e.to_string()))?;
127
128 let droplet_data = json_response.get("droplet").ok_or_else(|| {
129 DigitalOceanError::InvalidResponse("Missing droplet field".to_string())
130 })?;
131
132 let do_response: crate::domain::DigitalOceanDropletResponse =
133 serde_json::from_value(droplet_data.clone())
134 .map_err(|e| DigitalOceanError::InvalidResponse(e.to_string()))?;
135
136 return Ok(Droplet::from_do_response(do_response));
137 }
138 Err(e) => {
139 last_error = Some(e);
140 if attempt < MAX_RETRIES - 1 {
141 let backoff = INITIAL_BACKOFF_MS * 2_u64.pow(attempt);
142 sleep(Duration::from_millis(backoff)).await;
143 }
144 }
145 }
146 }
147
148 Err(DigitalOceanError::RequestFailed(
149 last_error
150 .map(|e| e.to_string())
151 .unwrap_or_else(|| "Max retries exceeded".to_string()),
152 ))
153 }
154
155 pub async fn get_droplet(&self, droplet_id: i64) -> Result<Droplet, DigitalOceanError> {
156 let mut last_error = None;
157 for attempt in 0..MAX_RETRIES {
158 let response = self
159 .client
160 .get(format!("{}/droplets/{}", self.base_url, droplet_id))
161 .send()
162 .await;
163
164 match response {
165 Ok(resp) => {
166 let status = resp.status().as_u16();
167
168 if status == 429 {
169 return Err(DigitalOceanError::RateLimited);
170 }
171
172 if status == 404 {
173 return Err(DigitalOceanError::NotFound(droplet_id));
174 }
175
176 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
178 let backoff = INITIAL_BACKOFF_MS * 2_u64.pow(attempt);
179 sleep(Duration::from_millis(backoff)).await;
180 continue;
181 }
182
183 if !resp.status().is_success() {
184 let error_text = resp
185 .text()
186 .await
187 .unwrap_or_else(|_| "Unknown error".to_string());
188 return Err(DigitalOceanError::RequestFailed(error_text));
189 }
190
191 let json_response: serde_json::Value = resp
192 .json()
193 .await
194 .map_err(|e| DigitalOceanError::InvalidResponse(e.to_string()))?;
195
196 let droplet_data = json_response.get("droplet").ok_or_else(|| {
197 DigitalOceanError::InvalidResponse("Missing droplet field".to_string())
198 })?;
199
200 let do_response: crate::domain::DigitalOceanDropletResponse =
201 serde_json::from_value(droplet_data.clone())
202 .map_err(|e| DigitalOceanError::InvalidResponse(e.to_string()))?;
203
204 return Ok(Droplet::from_do_response(do_response));
205 }
206 Err(e) => {
207 last_error = Some(e);
208 if attempt < MAX_RETRIES - 1 {
209 let backoff = INITIAL_BACKOFF_MS * 2_u64.pow(attempt);
210 sleep(Duration::from_millis(backoff)).await;
211 }
212 }
213 }
214 }
215
216 Err(DigitalOceanError::RequestFailed(
217 last_error
218 .map(|e| e.to_string())
219 .unwrap_or_else(|| "Max retries exceeded".to_string()),
220 ))
221 }
222
223 pub async fn destroy_droplet(&self, droplet_id: i64) -> Result<(), DigitalOceanError> {
224 let mut last_error = None;
225 for attempt in 0..MAX_RETRIES {
226 let response = self
227 .client
228 .delete(format!("{}/droplets/{}", self.base_url, droplet_id))
229 .send()
230 .await;
231
232 match response {
233 Ok(resp) => {
234 let status = resp.status().as_u16();
235
236 if status == 429 {
237 return Err(DigitalOceanError::RateLimited);
238 }
239
240 if status == 404 {
241 return Err(DigitalOceanError::NotFound(droplet_id));
242 }
243
244 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
246 let backoff = INITIAL_BACKOFF_MS * 2_u64.pow(attempt);
247 sleep(Duration::from_millis(backoff)).await;
248 continue;
249 }
250
251 if !resp.status().is_success() {
252 let error_text = resp
253 .text()
254 .await
255 .unwrap_or_else(|_| "Unknown error".to_string());
256 return Err(DigitalOceanError::RequestFailed(error_text));
257 }
258
259 return Ok(());
260 }
261 Err(e) => {
262 last_error = Some(e);
263 if attempt < MAX_RETRIES - 1 {
264 let backoff = INITIAL_BACKOFF_MS * 2_u64.pow(attempt);
265 sleep(Duration::from_millis(backoff)).await;
266 }
267 }
268 }
269 }
270
271 Err(DigitalOceanError::RequestFailed(
272 last_error
273 .map(|e| e.to_string())
274 .unwrap_or_else(|| "Max retries exceeded".to_string()),
275 ))
276 }
277
278 pub async fn shutdown_droplet(&self, droplet_id: i64) -> Result<(), DigitalOceanError> {
279 let body = json!({
280 "type": "shutdown",
281 });
282
283 let mut last_error = None;
284 for attempt in 0..MAX_RETRIES {
285 let response = self
286 .client
287 .post(format!("{}/droplets/{}/actions", self.base_url, droplet_id))
288 .json(&body)
289 .send()
290 .await;
291
292 match response {
293 Ok(resp) => {
294 let status = resp.status().as_u16();
295
296 if status == 429 {
297 return Err(DigitalOceanError::RateLimited);
298 }
299
300 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
302 let backoff = INITIAL_BACKOFF_MS * 2_u64.pow(attempt);
303 sleep(Duration::from_millis(backoff)).await;
304 continue;
305 }
306
307 if !resp.status().is_success() {
308 let error_text = resp
309 .text()
310 .await
311 .unwrap_or_else(|_| "Unknown error".to_string());
312 return Err(DigitalOceanError::RequestFailed(error_text));
313 }
314
315 return Ok(());
316 }
317 Err(e) => {
318 last_error = Some(e);
319 if attempt < MAX_RETRIES - 1 {
320 let backoff = INITIAL_BACKOFF_MS * 2_u64.pow(attempt);
321 sleep(Duration::from_millis(backoff)).await;
322 }
323 }
324 }
325 }
326
327 Err(DigitalOceanError::RequestFailed(
328 last_error
329 .map(|e| e.to_string())
330 .unwrap_or_else(|| "Max retries exceeded".to_string()),
331 ))
332 }
333
334 pub async fn reboot_droplet(&self, droplet_id: i64) -> Result<(), DigitalOceanError> {
335 let body = json!({
336 "type": "reboot",
337 });
338
339 let mut last_error = None;
340 for attempt in 0..MAX_RETRIES {
341 let response = self
342 .client
343 .post(format!("{}/droplets/{}/actions", self.base_url, droplet_id))
344 .json(&body)
345 .send()
346 .await;
347
348 match response {
349 Ok(resp) => {
350 let status = resp.status().as_u16();
351
352 if status == 429 {
353 return Err(DigitalOceanError::RateLimited);
354 }
355
356 if is_retryable_status(status) && attempt < MAX_RETRIES - 1 {
358 let backoff = INITIAL_BACKOFF_MS * 2_u64.pow(attempt);
359 sleep(Duration::from_millis(backoff)).await;
360 continue;
361 }
362
363 if !resp.status().is_success() {
364 let error_text = resp
365 .text()
366 .await
367 .unwrap_or_else(|_| "Unknown error".to_string());
368 return Err(DigitalOceanError::RequestFailed(error_text));
369 }
370
371 return Ok(());
372 }
373 Err(e) => {
374 last_error = Some(e);
375 if attempt < MAX_RETRIES - 1 {
376 let backoff = INITIAL_BACKOFF_MS * 2_u64.pow(attempt);
377 sleep(Duration::from_millis(backoff)).await;
378 }
379 }
380 }
381 }
382
383 Err(DigitalOceanError::RequestFailed(
384 last_error
385 .map(|e| e.to_string())
386 .unwrap_or_else(|| "Max retries exceeded".to_string()),
387 ))
388 }
389}