Skip to main content

codetether_agent/ralph/
store_http.rs

1//! HTTP implementation of RalphStateStore
2//!
3//! Posts state updates to the Python A2A server's Ralph API,
4//! bridging the Rust agent's execution state to the Postgres-backed dashboard.
5
6use async_trait::async_trait;
7use reqwest::Client;
8use serde_json::json;
9use tracing::{debug, warn};
10
11use super::state_store::{RalphRunState, RalphRunSummary, RalphStateStore, StoryResultEntry};
12use super::types::{Prd, ProgressEntry, RalphStatus};
13
14/// HTTP-backed state store that talks to the Python A2A server
15pub struct HttpStore {
16    client: Client,
17    base_url: String,
18    api_key: Option<String>,
19}
20
21impl HttpStore {
22    /// Create from environment variables.
23    ///
24    /// - `CODETETHER_API_URL` — base URL (default: `http://localhost:8080`)
25    /// - `CODETETHER_API_KEY` — optional Bearer token
26    pub fn from_env() -> Self {
27        let base_url = std::env::var("CODETETHER_API_URL")
28            .unwrap_or_else(|_| "http://localhost:8080".to_string());
29        let api_key = std::env::var("CODETETHER_API_KEY")
30            .ok()
31            .filter(|s| !s.is_empty());
32
33        Self {
34            client: Client::builder()
35                .timeout(std::time::Duration::from_secs(10))
36                .build()
37                .unwrap_or_default(),
38            base_url: base_url.trim_end_matches('/').to_string(),
39            api_key,
40        }
41    }
42
43    /// Create with explicit base URL and optional API key
44    pub fn new(base_url: &str, api_key: Option<String>) -> Self {
45        Self {
46            client: Client::builder()
47                .timeout(std::time::Duration::from_secs(10))
48                .build()
49                .unwrap_or_default(),
50            base_url: base_url.trim_end_matches('/').to_string(),
51            api_key,
52        }
53    }
54
55    fn apply_auth(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
56        if let Some(ref key) = self.api_key {
57            req.bearer_auth(key)
58        } else {
59            req
60        }
61    }
62
63    fn status_str(status: RalphStatus) -> &'static str {
64        match status {
65            RalphStatus::Pending => "pending",
66            RalphStatus::Running => "running",
67            RalphStatus::Completed => "completed",
68            RalphStatus::MaxIterations => "max_iterations",
69            RalphStatus::Stopped => "stopped",
70            RalphStatus::QualityFailed => "quality_failed",
71        }
72    }
73}
74
75#[async_trait]
76impl RalphStateStore for HttpStore {
77    async fn create_run(&self, state: &RalphRunState) -> anyhow::Result<()> {
78        let url = format!("{}/v1/ralph/runs", self.base_url);
79        let body = json!({
80            "id": state.run_id,
81            "prd": state.prd,
82            "workspace_id": state.okr_id,
83            "model": state.config.model,
84            "status": Self::status_str(state.status),
85            "max_iterations": state.max_iterations,
86            "run_mode": if state.config.parallel_enabled { "parallel" } else { "sequential" },
87            "max_parallel": state.config.max_concurrent_stories,
88        });
89
90        let req = self.apply_auth(self.client.post(&url)).json(&body);
91        match req.send().await {
92            Ok(resp) if resp.status().is_success() => {
93                debug!(run_id = %state.run_id, "Created run in HTTP store");
94            }
95            Ok(resp) => {
96                warn!(
97                    run_id = %state.run_id,
98                    status = %resp.status(),
99                    "HTTP store create_run failed"
100                );
101            }
102            Err(e) => {
103                warn!(run_id = %state.run_id, error = %e, "HTTP store create_run error");
104            }
105        }
106        Ok(())
107    }
108
109    async fn update_status(&self, run_id: &str, status: RalphStatus) -> anyhow::Result<()> {
110        let url = format!("{}/v1/ralph/runs/{}", self.base_url, run_id);
111        let mut body = json!({ "status": Self::status_str(status) });
112        if status == RalphStatus::Running {
113            body["started_at"] = json!(chrono::Utc::now().to_rfc3339());
114        }
115
116        let req = self.apply_auth(self.client.put(&url)).json(&body);
117        if let Err(e) = req.send().await {
118            warn!(run_id = %run_id, error = %e, "HTTP store update_status error");
119        }
120        Ok(())
121    }
122
123    async fn update_iteration(&self, run_id: &str, iteration: usize) -> anyhow::Result<()> {
124        let url = format!("{}/v1/ralph/runs/{}", self.base_url, run_id);
125        let body = json!({ "current_iteration": iteration });
126
127        let req = self.apply_auth(self.client.put(&url)).json(&body);
128        if let Err(e) = req.send().await {
129            warn!(run_id = %run_id, error = %e, "HTTP store update_iteration error");
130        }
131        Ok(())
132    }
133
134    async fn record_story_result(
135        &self,
136        run_id: &str,
137        result: &StoryResultEntry,
138    ) -> anyhow::Result<()> {
139        let url = format!(
140            "{}/v1/ralph/runs/{}/stories/{}",
141            self.base_url, run_id, result.story_id
142        );
143        let body = json!({
144            "story_id": result.story_id,
145            "title": result.title,
146            "passed": result.passed,
147            "iteration": result.iteration,
148            "error": result.error,
149        });
150
151        let req = self.apply_auth(self.client.put(&url)).json(&body);
152        if let Err(e) = req.send().await {
153            warn!(run_id = %run_id, story = %result.story_id, error = %e, "HTTP store record_story_result error");
154        }
155        Ok(())
156    }
157
158    async fn append_progress(&self, run_id: &str, entry: &ProgressEntry) -> anyhow::Result<()> {
159        // Append progress as a log entry via the update endpoint
160        let url = format!("{}/v1/ralph/runs/{}", self.base_url, run_id);
161        let body = json!({
162            "logs": [entry],
163        });
164
165        let req = self.apply_auth(self.client.put(&url)).json(&body);
166        if let Err(e) = req.send().await {
167            warn!(run_id = %run_id, error = %e, "HTTP store append_progress error");
168        }
169        Ok(())
170    }
171
172    async fn update_prd(&self, run_id: &str, prd: &Prd) -> anyhow::Result<()> {
173        let url = format!("{}/v1/ralph/runs/{}", self.base_url, run_id);
174        let body = json!({ "prd": prd });
175
176        let req = self.apply_auth(self.client.put(&url)).json(&body);
177        if let Err(e) = req.send().await {
178            warn!(run_id = %run_id, error = %e, "HTTP store update_prd error");
179        }
180        Ok(())
181    }
182
183    async fn set_error(&self, run_id: &str, error: &str) -> anyhow::Result<()> {
184        let url = format!("{}/v1/ralph/runs/{}", self.base_url, run_id);
185        let body = json!({ "error": error });
186
187        let req = self.apply_auth(self.client.put(&url)).json(&body);
188        if let Err(e) = req.send().await {
189            warn!(run_id = %run_id, error = %e, "HTTP store set_error error");
190        }
191        Ok(())
192    }
193
194    async fn complete_run(&self, run_id: &str, status: RalphStatus) -> anyhow::Result<()> {
195        let url = format!("{}/v1/ralph/runs/{}", self.base_url, run_id);
196        let body = json!({
197            "status": Self::status_str(status),
198            "completed_at": chrono::Utc::now().to_rfc3339(),
199        });
200
201        let req = self.apply_auth(self.client.put(&url)).json(&body);
202        if let Err(e) = req.send().await {
203            warn!(run_id = %run_id, error = %e, "HTTP store complete_run error");
204        }
205        Ok(())
206    }
207
208    async fn get_run(&self, run_id: &str) -> anyhow::Result<Option<RalphRunState>> {
209        let url = format!("{}/v1/ralph/runs/{}", self.base_url, run_id);
210        let req = self.apply_auth(self.client.get(&url));
211
212        match req.send().await {
213            Ok(resp) if resp.status().is_success() => match resp.json::<RalphRunState>().await {
214                Ok(state) => Ok(Some(state)),
215                Err(e) => {
216                    warn!(run_id = %run_id, error = %e, "HTTP store get_run parse error");
217                    Ok(None)
218                }
219            },
220            Ok(resp) if resp.status().as_u16() == 404 => Ok(None),
221            Ok(resp) => {
222                warn!(run_id = %run_id, status = %resp.status(), "HTTP store get_run failed");
223                Ok(None)
224            }
225            Err(e) => {
226                warn!(run_id = %run_id, error = %e, "HTTP store get_run error");
227                Ok(None)
228            }
229        }
230    }
231
232    async fn list_runs(&self) -> anyhow::Result<Vec<RalphRunSummary>> {
233        let url = format!("{}/v1/ralph/runs", self.base_url);
234        let req = self.apply_auth(self.client.get(&url));
235
236        match req.send().await {
237            Ok(resp) if resp.status().is_success() => {
238                match resp.json::<Vec<RalphRunSummary>>().await {
239                    Ok(runs) => Ok(runs),
240                    Err(e) => {
241                        warn!(error = %e, "HTTP store list_runs parse error");
242                        Ok(Vec::new())
243                    }
244                }
245            }
246            Ok(resp) => {
247                warn!(status = %resp.status(), "HTTP store list_runs failed");
248                Ok(Vec::new())
249            }
250            Err(e) => {
251                warn!(error = %e, "HTTP store list_runs error");
252                Ok(Vec::new())
253            }
254        }
255    }
256}