akribes_sdk/sub/bench.rs
1//! Sub-client for the akribes-server bench substrate.
2//!
3//! Wraps the per-script bench config CRUD, case CRUD + promote-from-execution,
4//! and the bench-run lifecycle (trigger / list / get / results /
5//! subscribe-events / cancel / delete / compare / tag-session). Two surfaces:
6//!
7//! - Project-scoped operations live on [`BenchClient`] (obtained via
8//! `client.project(id).bench()`). They take a script name + project_id
9//! together: `/projects/{id}/scripts/{name}/bench/...`.
10//! - Run-scoped operations (anything keyed on the global `bench_runs.id`) live
11//! on [`BenchRunsClient`] (obtained via `client.bench_runs()`). The same
12//! endpoints — `/bench-runs/{id}/...` — are reachable cross-project so
13//! they don't need a `project_id`.
14//!
15//! Strings on the wire (RFC3339 timestamps) — see [`crate::models`].
16
17use std::sync::Arc;
18
19use serde::Serialize;
20use tokio::sync::{mpsc, oneshot};
21
22use crate::client::{AkribesClient, Inner};
23use crate::error::{AkribesError, Result};
24use crate::models::*;
25use crate::sub::events::{EventSubscription, stream_bench_run_events};
26
27// ── Project-scoped bench client ──────────────────────────────────────────────
28
29/// Bench operations rooted at a project + script. Obtained via
30/// `client.project(id).bench()`.
31#[derive(Clone, Debug)]
32pub struct BenchClient {
33 pub(crate) inner: Arc<Inner>,
34 pub(crate) project_id: i64,
35}
36
37impl BenchClient {
38 pub(crate) fn new(inner: Arc<Inner>, project_id: i64) -> Self {
39 Self { inner, project_id }
40 }
41
42 fn c(&self) -> AkribesClient {
43 AkribesClient {
44 inner: Arc::clone(&self.inner),
45 }
46 }
47
48 fn bench_url(&self, script_name: &str) -> String {
49 format!(
50 "{}/projects/{}/scripts/{}/bench",
51 self.inner.base_url,
52 self.project_id,
53 urlencoding::encode(script_name),
54 )
55 }
56
57 // ── Project-wide summary ────────────────────────────────────────────────
58
59 /// `GET /projects/{id}/benches` — one [`ProjectBenchSummary`] per
60 /// configured bench in the project. Each row joins the bench with its
61 /// script name, judge name (when set), total case count, and the
62 /// most-recent run's identity + mean headline score. Backs the
63 /// project-level evals landing page; mirrors the TS SDK's
64 /// `listProjectSummaries`. 404 → empty list.
65 pub async fn list_project_summaries(&self) -> Result<Vec<ProjectBenchSummary>> {
66 let url = format!(
67 "{}/projects/{}/benches",
68 self.inner.base_url, self.project_id
69 );
70 self.c().get_list(&url).await
71 }
72
73 // ── Bench config CRUD ───────────────────────────────────────────────────
74
75 /// `GET /projects/{id}/scripts/{name}/bench` — 404 → `Ok(None)`.
76 pub async fn get(&self, script_name: &str) -> Result<Option<Bench>> {
77 self.c().get_opt(&self.bench_url(script_name)).await
78 }
79
80 /// `POST /projects/{id}/scripts/{name}/bench` — create or update.
81 pub async fn create_or_update(
82 &self,
83 script_name: &str,
84 req: &CreateOrUpdateBenchRequest,
85 ) -> Result<Bench> {
86 self.c().post(&self.bench_url(script_name), req).await
87 }
88
89 /// `DELETE /projects/{id}/scripts/{name}/bench`. Returns `true` if a row
90 /// was deleted, `false` if absent.
91 pub async fn delete(&self, script_name: &str) -> Result<bool> {
92 self.c().delete(&self.bench_url(script_name)).await
93 }
94
95 /// `GET /projects/{id}/scripts/{name}/signature` — the parsed script
96 /// signature (inputs + outputs) plus named type defs. Returned as
97 /// `serde_json::Value` because the server emits an ad-hoc tagged shape
98 /// that doesn't have a stable Rust mirror; the studio + MCP both treat it
99 /// as a blob.
100 pub async fn get_signature(&self, script_name: &str) -> Result<serde_json::Value> {
101 let url = format!(
102 "{}/projects/{}/scripts/{}/signature",
103 self.inner.base_url,
104 self.project_id,
105 urlencoding::encode(script_name),
106 );
107 Ok(self
108 .c()
109 .get_opt::<serde_json::Value>(&url)
110 .await?
111 .unwrap_or(serde_json::json!({})))
112 }
113
114 /// `GET /projects/{id}/scripts/{name}/bench/contract-preview` — workflow +
115 /// judge signatures with structured `breaks` list. Returned as a `Value`
116 /// because the wire shape contains the (unstable, JSON-only) signature
117 /// representation.
118 pub async fn contract_preview(
119 &self,
120 script_name: &str,
121 judge_script_id: i64,
122 channel: Option<&str>,
123 ) -> Result<serde_json::Value> {
124 #[derive(Serialize)]
125 struct Q<'a> {
126 judge: i64,
127 #[serde(skip_serializing_if = "Option::is_none")]
128 channel: Option<&'a str>,
129 }
130 let base = format!("{}/contract-preview", self.bench_url(script_name));
131 let url = AkribesClient::url_with_query(
132 &base,
133 &Q {
134 judge: judge_script_id,
135 channel,
136 },
137 );
138 Ok(self
139 .c()
140 .get_opt::<serde_json::Value>(&url)
141 .await?
142 .unwrap_or(serde_json::json!({})))
143 }
144
145 // ── Cases ───────────────────────────────────────────────────────────────
146
147 /// `GET /projects/{id}/scripts/{name}/bench/cases`. 404 → empty list.
148 pub async fn list_cases(&self, script_name: &str) -> Result<Vec<BenchCase>> {
149 let url = format!("{}/cases", self.bench_url(script_name));
150 self.c().get_list(&url).await
151 }
152
153 /// `POST /projects/{id}/scripts/{name}/bench/cases` — form-builder create.
154 pub async fn create_case(
155 &self,
156 script_name: &str,
157 req: &CreateBenchCaseRequest,
158 ) -> Result<BenchCase> {
159 let url = format!("{}/cases", self.bench_url(script_name));
160 self.c().post(&url, req).await
161 }
162
163 /// `GET /projects/{id}/scripts/{name}/bench/cases/contract-drift`.
164 pub async fn case_contract_drift(&self, script_name: &str) -> Result<DriftReport> {
165 let url = format!("{}/cases/contract-drift", self.bench_url(script_name));
166 Ok(self
167 .c()
168 .get_opt::<DriftReport>(&url)
169 .await?
170 .unwrap_or(DriftReport {
171 drifted: Vec::new(),
172 script_version_id: None,
173 published_at: None,
174 published_by: None,
175 summary: String::new(),
176 }))
177 }
178
179 // ── Runs (project-scoped surface) ───────────────────────────────────────
180
181 /// `GET /projects/{id}/scripts/{name}/bench/runs` — paginated via
182 /// `limit`/`offset`.
183 pub async fn list_runs(
184 &self,
185 script_name: &str,
186 limit: Option<i64>,
187 offset: Option<i64>,
188 ) -> Result<Vec<BenchRun>> {
189 #[derive(Serialize)]
190 struct Q {
191 #[serde(skip_serializing_if = "Option::is_none")]
192 limit: Option<i64>,
193 #[serde(skip_serializing_if = "Option::is_none")]
194 offset: Option<i64>,
195 }
196 let base = format!("{}/runs", self.bench_url(script_name));
197 let url = AkribesClient::url_with_query(&base, &Q { limit, offset });
198 self.c().get_list(&url).await
199 }
200
201 /// `POST /projects/{id}/scripts/{name}/bench/runs` — trigger a run.
202 /// `case_ids` constrains the fan-out to a subset (partial run).
203 pub async fn trigger_run(
204 &self,
205 script_name: &str,
206 req: &TriggerBenchRunRequest,
207 ) -> Result<BenchRun> {
208 let url = format!("{}/runs", self.bench_url(script_name));
209 self.c().post(&url, req).await
210 }
211}
212
213// ── Run-scoped (cross-project) client ────────────────────────────────────────
214
215/// Operations keyed on a global `bench_runs.id`. These endpoints live under
216/// `/bench-runs/{id}/...` and don't need a project scope (the server resolves
217/// the owning project from the run row). Obtained via `client.bench_runs()`.
218#[derive(Clone, Debug)]
219pub struct BenchRunsClient {
220 pub(crate) inner: Arc<Inner>,
221}
222
223impl BenchRunsClient {
224 pub(crate) fn new(inner: Arc<Inner>) -> Self {
225 Self { inner }
226 }
227
228 fn c(&self) -> AkribesClient {
229 AkribesClient {
230 inner: Arc::clone(&self.inner),
231 }
232 }
233
234 fn run_url(&self, run_id: i64) -> String {
235 format!("{}/bench-runs/{}", self.inner.base_url, run_id)
236 }
237
238 /// `GET /bench-runs/{id}` — 404 → `Ok(None)`.
239 pub async fn get(&self, run_id: i64) -> Result<Option<BenchRun>> {
240 self.c().get_opt(&self.run_url(run_id)).await
241 }
242
243 /// `DELETE /bench-runs/{id}` — returns `()` (server emits 204 No Content).
244 /// Cancels the run first (best-effort) before dropping the row.
245 pub async fn delete(&self, run_id: i64) -> Result<()> {
246 // `AkribesClient::delete` swallows the body and reports a bool
247 // (deleted vs already-absent). The bench delete endpoint emits 204,
248 // which is "deleted" — we discard the bool to give consumers a clean
249 // `()` return.
250 self.c().delete(&self.run_url(run_id)).await?;
251 Ok(())
252 }
253
254 /// `GET /bench-runs/{id}/results`. 404 → empty list.
255 pub async fn list_results(&self, run_id: i64) -> Result<Vec<BenchResult>> {
256 let url = format!("{}/results", self.run_url(run_id));
257 self.c().get_list(&url).await
258 }
259
260 /// Subscribe to a bench run's **live** result stream over SSE
261 /// (`GET /bench-runs/{id}/events`, `Accept: text/event-stream`).
262 ///
263 /// Returns a receiver of typed [`BenchRunEvent`]s plus an
264 /// [`EventSubscription`] handle that cancels the background listener
265 /// on drop. The server emits a [`BenchRunEvent::Result`] per recorded
266 /// case, a [`BenchRunEvent::Lagged`] if the broadcast subscriber falls
267 /// behind, and a final [`BenchRunEvent::Terminal`] carrying the run's
268 /// terminal status — after which the channel closes.
269 ///
270 /// This awaits the SSE subscription being live on the server before
271 /// returning, so a non-2xx (e.g. 403 on a project the token can't
272 /// read, or the run id not resolving to a project) surfaces here
273 /// rather than as a silently-empty stream. Reuses the crate's shared
274 /// SSE byte-deframer and field parser (the same machinery behind
275 /// `events().event_stream` / `executions().run_stream`).
276 ///
277 /// Mirrors the TS SDK's `subscribeRunEvents`; the `terminal` frame is
278 /// surfaced as a typed variant so Rust callers can detect
279 /// end-of-stream without a side channel. Drop the returned
280 /// `EventSubscription` (or let it fall out of scope) to unsubscribe.
281 ///
282 /// ```no_run
283 /// # use akribes_sdk::AkribesClient;
284 /// # use akribes_sdk::models::BenchRunEvent;
285 /// # async fn example(client: AkribesClient) -> akribes_sdk::Result<()> {
286 /// let (mut rx, _sub) = client.bench_runs().subscribe_run_events(42).await?;
287 /// while let Some(evt) = rx.recv().await {
288 /// match evt {
289 /// BenchRunEvent::Result(r) => println!("case {} → {}", r.case_id, r.status),
290 /// BenchRunEvent::Lagged { dropped } => eprintln!("dropped {dropped}"),
291 /// BenchRunEvent::Terminal { status } => {
292 /// println!("run ended: {status}");
293 /// break;
294 /// }
295 /// }
296 /// }
297 /// # Ok(()) }
298 /// ```
299 pub async fn subscribe_run_events(
300 &self,
301 run_id: i64,
302 ) -> Result<(mpsc::UnboundedReceiver<BenchRunEvent>, EventSubscription)> {
303 let (tx, rx) = mpsc::unbounded_channel();
304 let (ready_tx, ready_rx) = oneshot::channel::<Result<()>>();
305 let http = self.inner.http.clone();
306 let token = self.inner.token.clone();
307 let base_url = self.inner.base_url.clone();
308
309 let handle = tokio::spawn(async move {
310 let _ =
311 stream_bench_run_events(http, token, base_url, run_id, tx, Some(ready_tx)).await;
312 });
313
314 match ready_rx.await {
315 Ok(Ok(())) => Ok((rx, EventSubscription::from_handle(handle))),
316 Ok(Err(e)) => {
317 handle.abort();
318 Err(e)
319 }
320 Err(_) => {
321 handle.abort();
322 Err(AkribesError::Other(
323 "bench SSE listener died before subscription was confirmed".into(),
324 ))
325 }
326 }
327 }
328
329 /// `POST /bench-runs/{id}/cancel`. Flips the cancel token; in-flight cases
330 /// complete naturally. Returns the run row as it stands.
331 pub async fn cancel(&self, run_id: i64) -> Result<BenchRun> {
332 let url = format!("{}/cancel", self.run_url(run_id));
333 let empty: serde_json::Value = serde_json::json!({});
334 self.c().post(&url, &empty).await
335 }
336
337 /// `PATCH /bench-runs/{id}/tag-session` — attribute the run to an MCP
338 /// session id so the coordinator's finalize step posts the cost into
339 /// `mcp_session_cost`.
340 pub async fn tag_session(
341 &self,
342 run_id: i64,
343 mcp_session_id: &str,
344 ) -> Result<BenchRunTagSessionResponse> {
345 #[derive(Serialize)]
346 struct Body<'a> {
347 mcp_session_id: &'a str,
348 }
349 let url = format!("{}/tag-session", self.run_url(run_id));
350 self.c().patch(&url, &Body { mcp_session_id }).await
351 }
352
353 /// `POST /executions/{exec_id}/promote-to-case` — promote a completed
354 /// execution into a bench case, with optional `edits` overlay.
355 ///
356 /// This is run-scoped only in the loose sense: it lives on `/executions`
357 /// rather than `/bench-runs`, but it's the natural counterpart to the
358 /// "promote-from-execution" flow on the bench surface and doesn't need a
359 /// project_id (the server resolves the owning project from the source
360 /// execution).
361 pub async fn promote_execution(
362 &self,
363 execution_id: &str,
364 req: &PromoteExecutionRequest,
365 ) -> Result<BenchCase> {
366 let url = format!(
367 "{}/executions/{}/promote-to-case",
368 self.inner.base_url,
369 urlencoding::encode(execution_id),
370 );
371 self.c().post(&url, req).await
372 }
373
374 /// `GET /bench-runs/{a}/compare/{b}` — diff two runs of the same bench.
375 pub async fn compare(&self, run_a: i64, run_b: i64) -> Result<CompareReport> {
376 let url = format!(
377 "{}/bench-runs/{}/compare/{}",
378 self.inner.base_url, run_a, run_b,
379 );
380 self.c()
381 .get_opt::<CompareReport>(&url)
382 .await?
383 .ok_or_else(|| crate::error::AkribesError::HttpStatus {
384 status: 404,
385 message: format!("compare runs {}↔{} returned 404", run_a, run_b),
386 })
387 }
388
389 // ── Case-id keyed operations ────────────────────────────────────────────
390 //
391 // `PATCH /cases/{id}` and `DELETE /cases/{id}` live under `/cases` (no
392 // project scope) — same naming as `/bench-runs/{id}`. Surface them on
393 // the same global client.
394
395 /// `GET /executions/{case_id}` — fetch a single case (cases are
396 /// `executions` rows with `kind='case'`). Returned as `Value` for
397 /// compatibility with the MCP tool, which doesn't trust the row to
398 /// always type-check as `BenchCase` (legacy promoted-execution rows can
399 /// have null kind on older servers).
400 pub async fn get_case(&self, case_id: &str) -> Result<serde_json::Value> {
401 let url = format!(
402 "{}/executions/{}",
403 self.inner.base_url,
404 urlencoding::encode(case_id),
405 );
406 Ok(self
407 .c()
408 .get_opt::<serde_json::Value>(&url)
409 .await?
410 .unwrap_or(serde_json::Value::Null))
411 }
412
413 /// `PATCH /cases/{id}` — sparse update.
414 pub async fn patch_case(
415 &self,
416 case_id: &str,
417 req: &PatchBenchCaseRequest,
418 ) -> Result<BenchCase> {
419 let url = format!(
420 "{}/cases/{}",
421 self.inner.base_url,
422 urlencoding::encode(case_id),
423 );
424 self.c().patch(&url, req).await
425 }
426
427 /// `DELETE /cases/{id}`. The server emits a `{"deleted": true}` JSON body;
428 /// we discard it and return `()`.
429 pub async fn delete_case(&self, case_id: &str) -> Result<()> {
430 let url = format!(
431 "{}/cases/{}",
432 self.inner.base_url,
433 urlencoding::encode(case_id),
434 );
435 self.c().delete(&url).await?;
436 Ok(())
437 }
438
439 /// `GET /benches/{id}` — fast bench-by-id lookup. Returns the bench
440 /// row joined with the owning `project_id` + `script_name` so the
441 /// caller can chain into list_cases / list_runs without an N+1
442 /// project walk. 404 → `Ok(None)`.
443 pub async fn bench_by_id(&self, bench_id: i64) -> Result<Option<serde_json::Value>> {
444 let url = format!("{}/benches/{}", self.inner.base_url, bench_id);
445 self.c().get_json_value_opt(&url).await
446 }
447
448 /// `GET /mcp-sessions/{id}/cost` — aggregated cost for an MCP
449 /// session. Returns `{session_id, total_cost_usd, breakdown}`.
450 /// Lets the MCP server (and any other client) read accumulated
451 /// cost via HTTP rather than querying the `mcp_session_cost`
452 /// table directly.
453 pub async fn mcp_session_cost(&self, session_id: &str) -> Result<serde_json::Value> {
454 let url = format!(
455 "{}/mcp-sessions/{}/cost",
456 self.inner.base_url,
457 urlencoding::encode(session_id),
458 );
459 self.c().get_json_value(&url).await
460 }
461}