Skip to main content

ralph_workflow/cloud/io/
http.rs

1use crate::cloud::types::{interpret_http_response, CloudError, PipelineResult, ProgressUpdate};
2use crate::cloud::CloudReporter;
3use crate::config::types::CloudConfig;
4
5pub struct HttpCloudReporter {
6    config: CloudConfig,
7}
8
9impl HttpCloudReporter {
10    #[must_use]
11    pub const fn new(config: CloudConfig) -> Self {
12        Self { config }
13    }
14
15    pub(crate) fn build_url(api_url: &str, path: &str) -> Result<String, CloudError> {
16        let base = api_url.trim();
17        if !base.to_ascii_lowercase().starts_with("https://") {
18            return Err(CloudError::Configuration(
19                "Cloud API URL must use https://".to_string(),
20            ));
21        }
22
23        let base = base.trim_end_matches('/');
24        let path = path.trim_start_matches('/');
25
26        if path.is_empty() {
27            return Ok(base.to_string());
28        }
29
30        Ok(format!("{base}/{path}"))
31    }
32
33    pub(crate) fn post_json<T: serde::Serialize>(
34        &self,
35        path: &str,
36        body: &T,
37    ) -> Result<(), CloudError> {
38        let (url, api_token) = self.extract_credentials(path)?;
39        let json_body =
40            serde_json::to_value(body).map_err(|e| CloudError::Serialization(e.to_string()))?;
41        let (status, body) = perform_request(&url, &api_token, json_body)?;
42        interpret_http_response(status, body)
43    }
44
45    fn extract_credentials(&self, path: &str) -> Result<(String, String), CloudError> {
46        let api_url = self
47            .config
48            .api_url
49            .as_ref()
50            .ok_or_else(|| CloudError::Configuration("API URL not configured".to_string()))?;
51        let api_token = self
52            .config
53            .api_token
54            .as_ref()
55            .ok_or_else(|| CloudError::Configuration("API token not configured".to_string()))?;
56        let url = Self::build_url(api_url, path)?;
57        Ok((url, api_token.clone()))
58    }
59}
60
61fn perform_request(
62    url: &str,
63    api_token: &str,
64    json_body: serde_json::Value,
65) -> Result<(u16, String), CloudError> {
66    let agent = ureq::Agent::new_with_config(
67        ureq::config::Config::builder()
68            .timeout_global(Some(std::time::Duration::from_secs(30)))
69            .http_status_as_error(false)
70            .build(),
71    );
72    let response = agent
73        .post(url)
74        .header("Authorization", &format!("Bearer {api_token}"))
75        .header("Content-Type", "application/json")
76        .send_json(json_body);
77
78    match response {
79        Ok(resp) => {
80            let status = resp.status().as_u16();
81            let body = resp
82                .into_body()
83                .read_to_string()
84                .map_err(|e| CloudError::NetworkError(e.to_string()))?;
85            Ok((status, body))
86        }
87        Err(e) => Err(CloudError::NetworkError(e.to_string())),
88    }
89}
90
91impl CloudReporter for HttpCloudReporter {
92    fn report_progress(&self, update: &ProgressUpdate) -> Result<(), CloudError> {
93        let run_id = self
94            .config
95            .run_id
96            .as_ref()
97            .ok_or_else(|| CloudError::Configuration("Run ID not configured".to_string()))?;
98
99        let path = format!("runs/{run_id}/progress");
100        self.post_json(&path, update)
101    }
102
103    fn heartbeat(&self) -> Result<(), CloudError> {
104        let run_id = self
105            .config
106            .run_id
107            .as_ref()
108            .ok_or_else(|| CloudError::Configuration("Run ID not configured".to_string()))?;
109
110        let path = format!("runs/{run_id}/heartbeat");
111        let body = serde_json::json!({
112            "timestamp": chrono::Utc::now().to_rfc3339(),
113        });
114        self.post_json(&path, &body)
115    }
116
117    fn report_completion(&self, result: &PipelineResult) -> Result<(), CloudError> {
118        let run_id = self
119            .config
120            .run_id
121            .as_ref()
122            .ok_or_else(|| CloudError::Configuration("Run ID not configured".to_string()))?;
123
124        let path = format!("runs/{run_id}/complete");
125        self.post_json(&path, result)
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132    use crate::config::types::CloudConfig;
133
134    #[test]
135    fn test_build_url_trims_slashes_and_joins_paths() {
136        let base = "https://api.example.com/v1/";
137        let url = HttpCloudReporter::build_url(base, "/runs/run_1/progress").unwrap();
138        assert_eq!(
139            url, "https://api.example.com/v1/runs/run_1/progress",
140            "URL join should avoid double slashes"
141        );
142    }
143
144    #[test]
145    fn test_build_url_rejects_non_https() {
146        let err = HttpCloudReporter::build_url("http://api.example.com", "/runs/x").unwrap_err();
147        match err {
148            CloudError::Configuration(_) => {}
149            other => panic!("expected Configuration error, got: {other:?}"),
150        }
151    }
152
153    #[test]
154    fn test_http_reporter_requires_config() {
155        let config = CloudConfig::disabled();
156        let reporter = HttpCloudReporter::new(config);
157
158        let update = ProgressUpdate {
159            timestamp: "2025-02-15T10:00:00Z".to_string(),
160            phase: "Planning".to_string(),
161            previous_phase: None,
162            iteration: Some(1),
163            total_iterations: Some(3),
164            review_pass: None,
165            total_review_passes: None,
166            message: "Test".to_string(),
167            event_type: crate::cloud::types::ProgressEventType::PipelineStarted,
168        };
169
170        assert!(reporter.report_progress(&update).is_err());
171    }
172}