synth_ai_core/api/
eval.rs1use std::time::{Duration, Instant};
6
7use reqwest::header::{HeaderMap, HeaderValue};
8use serde_json::Value;
9
10use crate::http::HttpError;
11use crate::polling::{calculate_backoff, BackoffConfig};
12use crate::CoreError;
13
14use super::client::SynthClient;
15use super::types::{CancelRequest, EvalJobRequest, EvalJobStatus, EvalResult, JobSubmitResponse};
16
17const JOBS_ROOT: &str = "/api/jobs";
19
20const EVAL_CREATE_ENDPOINT: &str = "/api/jobs/eval";
22
23const EVAL_ENDPOINT: &str = "/api/eval/jobs";
25
26pub struct EvalClient<'a> {
30 client: &'a SynthClient,
31}
32
33impl<'a> EvalClient<'a> {
34 pub(crate) fn new(client: &'a SynthClient) -> Self {
36 Self { client }
37 }
38
39 pub async fn submit(&self, request: EvalJobRequest) -> Result<String, CoreError> {
61 let worker_token = request.container_worker_token.clone();
62 let body = serde_json::to_value(&request)
63 .map_err(|e| CoreError::Validation(format!("failed to serialize request: {}", e)))?;
64 let response: JobSubmitResponse = if let Some(token) = worker_token {
65 let mut headers = HeaderMap::new();
66 headers.insert(
67 "X-SynthTunnel-Worker-Token",
68 HeaderValue::from_str(&token).map_err(|_| {
69 CoreError::Validation("invalid SynthTunnel worker token".to_string())
70 })?,
71 );
72 self.client
73 .http
74 .post_json_with_headers(EVAL_CREATE_ENDPOINT, &body, Some(headers))
75 .await
76 .map_err(map_http_error)?
77 } else {
78 self.client
79 .http
80 .post_json(EVAL_CREATE_ENDPOINT, &body)
81 .await
82 .map_err(map_http_error)?
83 };
84
85 Ok(response.job_id)
86 }
87
88 pub async fn submit_raw(&self, request: Value) -> Result<String, CoreError> {
90 let response: JobSubmitResponse = self
91 .client
92 .http
93 .post_json(EVAL_CREATE_ENDPOINT, &request)
94 .await
95 .map_err(map_http_error)?;
96
97 Ok(response.job_id)
98 }
99
100 pub async fn get_status(&self, job_id: &str) -> Result<EvalResult, CoreError> {
110 let path = format!("{}/{}", EVAL_ENDPOINT, job_id);
112 self.client
113 .http
114 .get(&path, None)
115 .await
116 .map_err(map_http_error)
117 }
118
119 pub async fn get_results(&self, job_id: &str) -> Result<Value, CoreError> {
123 let path = format!("{}/{}/results", EVAL_ENDPOINT, job_id);
124 self.client
125 .http
126 .get(&path, None)
127 .await
128 .map_err(map_http_error)
129 }
130
131 pub async fn download_traces(&self, job_id: &str) -> Result<Vec<u8>, CoreError> {
133 let path = format!("{}/{}/traces", EVAL_ENDPOINT, job_id);
134 self.client
135 .http
136 .get_bytes(&path, None)
137 .await
138 .map_err(map_http_error)
139 }
140
141 pub async fn poll_until_complete(
160 &self,
161 job_id: &str,
162 timeout_secs: f64,
163 interval_secs: f64,
164 ) -> Result<EvalResult, CoreError> {
165 let start = Instant::now();
166 let timeout = Duration::from_secs_f64(timeout_secs);
167 let base_interval_ms = (interval_secs * 1000.0) as u64;
168
169 let config = BackoffConfig::new(base_interval_ms, 60000, 4);
170
171 let mut consecutive_errors = 0u32;
172 let max_errors = 10u32;
173
174 loop {
175 if start.elapsed() > timeout {
177 return Err(CoreError::Timeout(format!(
178 "eval job {} did not complete within {:.0} seconds",
179 job_id, timeout_secs
180 )));
181 }
182
183 match self.get_status(job_id).await {
185 Ok(result) => {
186 consecutive_errors = 0;
187
188 if result.status.is_terminal() {
189 return Ok(result);
190 }
191
192 let delay = calculate_backoff(&config, 0);
194 tokio::time::sleep(delay).await;
195 }
196 Err(e) => {
197 consecutive_errors += 1;
198
199 if consecutive_errors >= max_errors {
200 return Err(CoreError::Internal(format!(
201 "too many consecutive errors polling eval job {}: {}",
202 job_id, e
203 )));
204 }
205
206 let delay = calculate_backoff(&config, consecutive_errors);
208 tokio::time::sleep(delay).await;
209 }
210 }
211 }
212 }
213
214 pub async fn cancel(&self, job_id: &str, reason: Option<String>) -> Result<Value, CoreError> {
221 let path = format!("{}/{}/cancel", JOBS_ROOT, job_id);
222 let body = serde_json::to_value(&CancelRequest { reason })
223 .unwrap_or(Value::Object(serde_json::Map::new()));
224
225 self.client
226 .http
227 .post_json(&path, &body)
228 .await
229 .map_err(map_http_error)
230 }
231
232 pub async fn query_workflow_state(&self, job_id: &str) -> Result<Value, CoreError> {
234 let path = format!("/api/jobs/{}/workflow-state", job_id);
235 self.client
236 .http
237 .get_json(&path, None)
238 .await
239 .map_err(map_http_error)
240 }
241
242 pub async fn list(
249 &self,
250 limit: Option<i32>,
251 status: Option<EvalJobStatus>,
252 ) -> Result<Vec<EvalResult>, CoreError> {
253 let mut params = vec![];
254
255 let limit_str;
256 if let Some(l) = limit {
257 limit_str = l.to_string();
258 params.push(("limit", limit_str.as_str()));
259 }
260
261 let status_str;
262 if let Some(s) = status {
263 status_str = s.as_str().to_string();
264 params.push(("status", status_str.as_str()));
265 }
266
267 let params_ref: Option<&[(&str, &str)]> = if params.is_empty() {
268 None
269 } else {
270 Some(¶ms)
271 };
272
273 self.client
274 .http
275 .get(EVAL_ENDPOINT, params_ref)
276 .await
277 .map_err(map_http_error)
278 }
279}
280
281fn map_http_error(e: HttpError) -> CoreError {
283 match e {
284 HttpError::Response(detail) => {
285 if detail.status == 401 || detail.status == 403 {
286 CoreError::Authentication(format!("authentication failed: {}", detail))
287 } else if detail.status == 429 {
288 CoreError::UsageLimit(crate::UsageLimitInfo::from_http_429("eval", &detail))
289 } else {
290 CoreError::HttpResponse(crate::HttpErrorInfo {
291 status: detail.status,
292 url: detail.url,
293 message: detail.message,
294 body_snippet: detail.body_snippet,
295 })
296 }
297 }
298 HttpError::Request(e) => CoreError::Http(e),
299 _ => CoreError::Internal(format!("{}", e)),
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306
307 #[test]
308 fn test_eval_endpoint() {
309 assert_eq!(EVAL_CREATE_ENDPOINT, "/api/jobs/eval");
310 assert_eq!(EVAL_ENDPOINT, "/api/eval/jobs");
311 assert_eq!(JOBS_ROOT, "/api/jobs");
312 }
313}