reflex/lifecycle/
cloud.rs

1//! Cloud storage operations used by lifecycle management.
2//!
3//! `GcpCloudOps` shells out to `gsutil`/`gcloud`. `LocalCloudOps` is a filesystem mock.
4
5use std::path::{Path, PathBuf};
6use std::process::Stdio;
7use std::time::Duration;
8
9use async_trait::async_trait;
10use reqwest::Client as HttpClient;
11use tokio::process::Command;
12
13use super::error::{LifecycleError, LifecycleResult};
14
15const METADATA_TIMEOUT: Duration = Duration::from_secs(5);
16const CMD_TIMEOUT: Duration = Duration::from_secs(120);
17const CMD_RETRIES: usize = 3;
18const CMD_RETRY_BACKOFF: Duration = Duration::from_millis(750);
19
20#[async_trait]
21/// Cloud operations required for hydration/dehydration and self-stop.
22pub trait CloudOps: Send + Sync {
23    /// Downloads `object` from `container` to `dest`.
24    async fn download_file(
25        &self,
26        container: &str,
27        object: &str,
28        dest: &Path,
29    ) -> LifecycleResult<()>;
30    /// Uploads `src` to `container/object`.
31    async fn upload_file(&self, container: &str, object: &str, src: &Path) -> LifecycleResult<()>;
32    /// Attempts to stop the current instance.
33    async fn stop_self(&self) -> LifecycleResult<()>;
34}
35
36/// Google Cloud Storage / Compute Engine implementation.
37pub struct GcpCloudOps {
38    gsutil_path: PathBuf,
39    gcloud_path: PathBuf,
40    http: HttpClient,
41}
42
43impl GcpCloudOps {
44    /// Creates a new GCP implementation using `gsutil` and `gcloud` from `PATH`.
45    pub fn new() -> Self {
46        Self {
47            gsutil_path: PathBuf::from("gsutil"),
48            gcloud_path: PathBuf::from("gcloud"),
49            http: HttpClient::builder()
50                .timeout(METADATA_TIMEOUT)
51                .build()
52                .unwrap_or_else(|_| HttpClient::new()),
53        }
54    }
55
56    async fn run_command_with_retries(
57        &self,
58        program: &Path,
59        args: Vec<String>,
60        label: &str,
61    ) -> LifecycleResult<()> {
62        let mut attempt = 0usize;
63        loop {
64            attempt += 1;
65
66            let mut cmd = Command::new(program);
67            cmd.args(&args)
68                .kill_on_drop(true)
69                .stdout(Stdio::piped())
70                .stderr(Stdio::piped());
71
72            let child = cmd
73                .spawn()
74                .map_err(|e| LifecycleError::CloudError(format!("Failed to spawn {label}: {e}")))?;
75
76            let output = match tokio::time::timeout(CMD_TIMEOUT, child.wait_with_output()).await {
77                Ok(res) => res.map_err(|e| {
78                    LifecycleError::CloudError(format!("Failed waiting for {label}: {e}"))
79                })?,
80                Err(_) => {
81                    return Err(LifecycleError::CloudError(format!(
82                        "{label} timed out after {:?}",
83                        CMD_TIMEOUT
84                    )));
85                }
86            };
87
88            if output.status.success() {
89                return Ok(());
90            }
91
92            let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
93            let err = LifecycleError::CloudError(format!("{label} failed: {stderr}"));
94
95            if attempt >= CMD_RETRIES {
96                return Err(err);
97            }
98
99            tokio::time::sleep(CMD_RETRY_BACKOFF).await;
100        }
101    }
102
103    async fn get_instance_metadata(&self, key: &str) -> LifecycleResult<String> {
104        let url = format!(
105            "http://metadata.google.internal/computeMetadata/v1/instance/{}",
106            key
107        );
108        let resp = self
109            .http
110            .get(&url)
111            .header("Metadata-Flavor", "Google")
112            .send()
113            .await
114            .map_err(|e| LifecycleError::CloudError(format!("Metadata request failed: {}", e)))?;
115
116        if !resp.status().is_success() {
117            return Err(LifecycleError::CloudError(format!(
118                "Metadata error: {}",
119                resp.status()
120            )));
121        }
122
123        let text = resp
124            .text()
125            .await
126            .map_err(|e| LifecycleError::CloudError(format!("Failed to read metadata: {}", e)))?;
127
128        Ok(text.trim().to_string())
129    }
130
131    async fn stop_instance(&self, zone: &str, instance: &str) -> LifecycleResult<()> {
132        self.run_command_with_retries(
133            &self.gcloud_path,
134            vec![
135                "compute".to_string(),
136                "instances".to_string(),
137                "stop".to_string(),
138                instance.to_string(),
139                "--zone".to_string(),
140                zone.to_string(),
141                "--quiet".to_string(),
142            ],
143            "gcloud stop",
144        )
145        .await
146    }
147}
148
149impl Default for GcpCloudOps {
150    fn default() -> Self {
151        Self::new()
152    }
153}
154
155#[async_trait]
156impl CloudOps for GcpCloudOps {
157    async fn download_file(
158        &self,
159        container: &str,
160        object: &str,
161        dest: &Path,
162    ) -> LifecycleResult<()> {
163        let uri = format!("gs://{}/{}", container, object);
164        self.run_command_with_retries(
165            &self.gsutil_path,
166            vec!["cp".to_string(), uri, dest.to_string_lossy().to_string()],
167            "gsutil cp (download)",
168        )
169        .await
170    }
171
172    async fn upload_file(&self, container: &str, object: &str, src: &Path) -> LifecycleResult<()> {
173        let uri = format!("gs://{}/{}", container, object);
174        self.run_command_with_retries(
175            &self.gsutil_path,
176            vec!["cp".to_string(), src.to_string_lossy().to_string(), uri],
177            "gsutil cp (upload)",
178        )
179        .await
180    }
181
182    async fn stop_self(&self) -> LifecycleResult<()> {
183        let zone_full = self.get_instance_metadata("zone").await?;
184        let instance = self.get_instance_metadata("name").await?;
185        let zone = zone_full.rsplit('/').next().unwrap_or(&zone_full);
186        self.stop_instance(zone, &instance).await
187    }
188}
189
190/// Local filesystem mock implementation of [`CloudOps`].
191pub struct LocalCloudOps;
192
193impl LocalCloudOps {
194    /// Creates a new local implementation.
195    pub fn new() -> Self {
196        Self
197    }
198}
199
200impl Default for LocalCloudOps {
201    fn default() -> Self {
202        Self::new()
203    }
204}
205
206#[async_trait]
207impl CloudOps for LocalCloudOps {
208    async fn download_file(
209        &self,
210        container: &str,
211        object: &str,
212        dest: &Path,
213    ) -> LifecycleResult<()> {
214        let remote_dir = std::env::temp_dir()
215            .join("reflex_cloud_mock")
216            .join(container);
217        let src = remote_dir.join(object);
218
219        if !src.exists() {
220            return Err(LifecycleError::CloudError(format!(
221                "Local object not found: {:?}",
222                src
223            )));
224        }
225
226        tokio::fs::copy(&src, dest).await?;
227        Ok(())
228    }
229
230    async fn upload_file(&self, container: &str, object: &str, src: &Path) -> LifecycleResult<()> {
231        let remote_dir = std::env::temp_dir()
232            .join("reflex_cloud_mock")
233            .join(container);
234        tokio::fs::create_dir_all(&remote_dir).await?;
235        let dest = remote_dir.join(object);
236
237        tokio::fs::copy(src, &dest).await?;
238        Ok(())
239    }
240
241    async fn stop_self(&self) -> LifecycleResult<()> {
242        println!("LocalCloudOps: Stop requested (simulated).");
243        Ok(())
244    }
245}