reflex/lifecycle/
cloud.rs1use std::path::{Path, PathBuf};
6use std::process::Stdio;
7use std::time::Duration;
8
9use async_trait::async_trait;
10use reqwest::Client as HttpClient;
11use tokio::process::Command;
12
13use super::error::{LifecycleError, LifecycleResult};
14
15const METADATA_TIMEOUT: Duration = Duration::from_secs(5);
16const CMD_TIMEOUT: Duration = Duration::from_secs(120);
17const CMD_RETRIES: usize = 3;
18const CMD_RETRY_BACKOFF: Duration = Duration::from_millis(750);
19
20#[async_trait]
21pub trait CloudOps: Send + Sync {
23 async fn download_file(
25 &self,
26 container: &str,
27 object: &str,
28 dest: &Path,
29 ) -> LifecycleResult<()>;
30 async fn upload_file(&self, container: &str, object: &str, src: &Path) -> LifecycleResult<()>;
32 async fn stop_self(&self) -> LifecycleResult<()>;
34}
35
36pub struct GcpCloudOps {
38 gsutil_path: PathBuf,
39 gcloud_path: PathBuf,
40 http: HttpClient,
41}
42
43impl GcpCloudOps {
44 pub fn new() -> Self {
46 Self {
47 gsutil_path: PathBuf::from("gsutil"),
48 gcloud_path: PathBuf::from("gcloud"),
49 http: HttpClient::builder()
50 .timeout(METADATA_TIMEOUT)
51 .build()
52 .unwrap_or_else(|_| HttpClient::new()),
53 }
54 }
55
56 async fn run_command_with_retries(
57 &self,
58 program: &Path,
59 args: Vec<String>,
60 label: &str,
61 ) -> LifecycleResult<()> {
62 let mut attempt = 0usize;
63 loop {
64 attempt += 1;
65
66 let mut cmd = Command::new(program);
67 cmd.args(&args)
68 .kill_on_drop(true)
69 .stdout(Stdio::piped())
70 .stderr(Stdio::piped());
71
72 let child = cmd
73 .spawn()
74 .map_err(|e| LifecycleError::CloudError(format!("Failed to spawn {label}: {e}")))?;
75
76 let output = match tokio::time::timeout(CMD_TIMEOUT, child.wait_with_output()).await {
77 Ok(res) => res.map_err(|e| {
78 LifecycleError::CloudError(format!("Failed waiting for {label}: {e}"))
79 })?,
80 Err(_) => {
81 return Err(LifecycleError::CloudError(format!(
82 "{label} timed out after {:?}",
83 CMD_TIMEOUT
84 )));
85 }
86 };
87
88 if output.status.success() {
89 return Ok(());
90 }
91
92 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
93 let err = LifecycleError::CloudError(format!("{label} failed: {stderr}"));
94
95 if attempt >= CMD_RETRIES {
96 return Err(err);
97 }
98
99 tokio::time::sleep(CMD_RETRY_BACKOFF).await;
100 }
101 }
102
103 async fn get_instance_metadata(&self, key: &str) -> LifecycleResult<String> {
104 let url = format!(
105 "http://metadata.google.internal/computeMetadata/v1/instance/{}",
106 key
107 );
108 let resp = self
109 .http
110 .get(&url)
111 .header("Metadata-Flavor", "Google")
112 .send()
113 .await
114 .map_err(|e| LifecycleError::CloudError(format!("Metadata request failed: {}", e)))?;
115
116 if !resp.status().is_success() {
117 return Err(LifecycleError::CloudError(format!(
118 "Metadata error: {}",
119 resp.status()
120 )));
121 }
122
123 let text = resp
124 .text()
125 .await
126 .map_err(|e| LifecycleError::CloudError(format!("Failed to read metadata: {}", e)))?;
127
128 Ok(text.trim().to_string())
129 }
130
131 async fn stop_instance(&self, zone: &str, instance: &str) -> LifecycleResult<()> {
132 self.run_command_with_retries(
133 &self.gcloud_path,
134 vec![
135 "compute".to_string(),
136 "instances".to_string(),
137 "stop".to_string(),
138 instance.to_string(),
139 "--zone".to_string(),
140 zone.to_string(),
141 "--quiet".to_string(),
142 ],
143 "gcloud stop",
144 )
145 .await
146 }
147}
148
149impl Default for GcpCloudOps {
150 fn default() -> Self {
151 Self::new()
152 }
153}
154
155#[async_trait]
156impl CloudOps for GcpCloudOps {
157 async fn download_file(
158 &self,
159 container: &str,
160 object: &str,
161 dest: &Path,
162 ) -> LifecycleResult<()> {
163 let uri = format!("gs://{}/{}", container, object);
164 self.run_command_with_retries(
165 &self.gsutil_path,
166 vec!["cp".to_string(), uri, dest.to_string_lossy().to_string()],
167 "gsutil cp (download)",
168 )
169 .await
170 }
171
172 async fn upload_file(&self, container: &str, object: &str, src: &Path) -> LifecycleResult<()> {
173 let uri = format!("gs://{}/{}", container, object);
174 self.run_command_with_retries(
175 &self.gsutil_path,
176 vec!["cp".to_string(), src.to_string_lossy().to_string(), uri],
177 "gsutil cp (upload)",
178 )
179 .await
180 }
181
182 async fn stop_self(&self) -> LifecycleResult<()> {
183 let zone_full = self.get_instance_metadata("zone").await?;
184 let instance = self.get_instance_metadata("name").await?;
185 let zone = zone_full.rsplit('/').next().unwrap_or(&zone_full);
186 self.stop_instance(zone, &instance).await
187 }
188}
189
190pub struct LocalCloudOps;
192
193impl LocalCloudOps {
194 pub fn new() -> Self {
196 Self
197 }
198}
199
200impl Default for LocalCloudOps {
201 fn default() -> Self {
202 Self::new()
203 }
204}
205
206#[async_trait]
207impl CloudOps for LocalCloudOps {
208 async fn download_file(
209 &self,
210 container: &str,
211 object: &str,
212 dest: &Path,
213 ) -> LifecycleResult<()> {
214 let remote_dir = std::env::temp_dir()
215 .join("reflex_cloud_mock")
216 .join(container);
217 let src = remote_dir.join(object);
218
219 if !src.exists() {
220 return Err(LifecycleError::CloudError(format!(
221 "Local object not found: {:?}",
222 src
223 )));
224 }
225
226 tokio::fs::copy(&src, dest).await?;
227 Ok(())
228 }
229
230 async fn upload_file(&self, container: &str, object: &str, src: &Path) -> LifecycleResult<()> {
231 let remote_dir = std::env::temp_dir()
232 .join("reflex_cloud_mock")
233 .join(container);
234 tokio::fs::create_dir_all(&remote_dir).await?;
235 let dest = remote_dir.join(object);
236
237 tokio::fs::copy(src, &dest).await?;
238 Ok(())
239 }
240
241 async fn stop_self(&self) -> LifecycleResult<()> {
242 println!("LocalCloudOps: Stop requested (simulated).");
243 Ok(())
244 }
245}