Skip to main content

akribes_sdk/sub/
bench.rs

1//! Sub-client for the akribes-server bench substrate.
2//!
3//! Wraps the per-script bench config CRUD, case CRUD + promote-from-execution,
4//! and the bench-run lifecycle (trigger / list / get / results /
5//! subscribe-events / cancel / delete / compare / tag-session). Two surfaces:
6//!
7//! - Project-scoped operations live on [`BenchClient`] (obtained via
8//!   `client.project(id).bench()`). They take a script name + project_id
9//!   together: `/projects/{id}/scripts/{name}/bench/...`.
10//! - Run-scoped operations (anything keyed on the global `bench_runs.id`) live
11//!   on [`BenchRunsClient`] (obtained via `client.bench_runs()`). The same
12//!   endpoints — `/bench-runs/{id}/...` — are reachable cross-project so
13//!   they don't need a `project_id`.
14//!
15//! Strings on the wire (RFC3339 timestamps) — see [`crate::models`].
16
17use std::sync::Arc;
18
19use serde::Serialize;
20use tokio::sync::{mpsc, oneshot};
21
22use crate::client::{AkribesClient, Inner};
23use crate::error::{AkribesError, Result};
24use crate::models::*;
25use crate::sub::events::{EventSubscription, stream_bench_run_events};
26
27// ── Project-scoped bench client ──────────────────────────────────────────────
28
29/// Bench operations rooted at a project + script. Obtained via
30/// `client.project(id).bench()`.
31#[derive(Clone, Debug)]
32pub struct BenchClient {
33    pub(crate) inner: Arc<Inner>,
34    pub(crate) project_id: i64,
35}
36
37impl BenchClient {
38    pub(crate) fn new(inner: Arc<Inner>, project_id: i64) -> Self {
39        Self { inner, project_id }
40    }
41
42    fn c(&self) -> AkribesClient {
43        AkribesClient {
44            inner: Arc::clone(&self.inner),
45        }
46    }
47
48    fn bench_url(&self, script_name: &str) -> String {
49        format!(
50            "{}/projects/{}/scripts/{}/bench",
51            self.inner.base_url,
52            self.project_id,
53            urlencoding::encode(script_name),
54        )
55    }
56
57    // ── Project-wide summary ────────────────────────────────────────────────
58
59    /// `GET /projects/{id}/benches` — one [`ProjectBenchSummary`] per
60    /// configured bench in the project. Each row joins the bench with its
61    /// script name, judge name (when set), total case count, and the
62    /// most-recent run's identity + mean headline score. Backs the
63    /// project-level evals landing page; mirrors the TS SDK's
64    /// `listProjectSummaries`. 404 → empty list.
65    pub async fn list_project_summaries(&self) -> Result<Vec<ProjectBenchSummary>> {
66        let url = format!(
67            "{}/projects/{}/benches",
68            self.inner.base_url, self.project_id
69        );
70        self.c().get_list(&url).await
71    }
72
73    // ── Bench config CRUD ───────────────────────────────────────────────────
74
75    /// `GET /projects/{id}/scripts/{name}/bench` — 404 → `Ok(None)`.
76    pub async fn get(&self, script_name: &str) -> Result<Option<Bench>> {
77        self.c().get_opt(&self.bench_url(script_name)).await
78    }
79
80    /// `POST /projects/{id}/scripts/{name}/bench` — create or update.
81    pub async fn create_or_update(
82        &self,
83        script_name: &str,
84        req: &CreateOrUpdateBenchRequest,
85    ) -> Result<Bench> {
86        self.c().post(&self.bench_url(script_name), req).await
87    }
88
89    /// `DELETE /projects/{id}/scripts/{name}/bench`. Returns `true` if a row
90    /// was deleted, `false` if absent.
91    pub async fn delete(&self, script_name: &str) -> Result<bool> {
92        self.c().delete(&self.bench_url(script_name)).await
93    }
94
95    /// `GET /projects/{id}/scripts/{name}/signature` — the parsed script
96    /// signature (inputs + outputs) plus named type defs. Returned as
97    /// `serde_json::Value` because the server emits an ad-hoc tagged shape
98    /// that doesn't have a stable Rust mirror; the studio + MCP both treat it
99    /// as a blob.
100    pub async fn get_signature(&self, script_name: &str) -> Result<serde_json::Value> {
101        let url = format!(
102            "{}/projects/{}/scripts/{}/signature",
103            self.inner.base_url,
104            self.project_id,
105            urlencoding::encode(script_name),
106        );
107        Ok(self
108            .c()
109            .get_opt::<serde_json::Value>(&url)
110            .await?
111            .unwrap_or(serde_json::json!({})))
112    }
113
114    /// `GET /projects/{id}/scripts/{name}/bench/contract-preview` — workflow +
115    /// judge signatures with structured `breaks` list. Returned as a `Value`
116    /// because the wire shape contains the (unstable, JSON-only) signature
117    /// representation.
118    pub async fn contract_preview(
119        &self,
120        script_name: &str,
121        judge_script_id: i64,
122        channel: Option<&str>,
123    ) -> Result<serde_json::Value> {
124        #[derive(Serialize)]
125        struct Q<'a> {
126            judge: i64,
127            #[serde(skip_serializing_if = "Option::is_none")]
128            channel: Option<&'a str>,
129        }
130        let base = format!("{}/contract-preview", self.bench_url(script_name));
131        let url = AkribesClient::url_with_query(
132            &base,
133            &Q {
134                judge: judge_script_id,
135                channel,
136            },
137        );
138        Ok(self
139            .c()
140            .get_opt::<serde_json::Value>(&url)
141            .await?
142            .unwrap_or(serde_json::json!({})))
143    }
144
145    // ── Cases ───────────────────────────────────────────────────────────────
146
147    /// `GET /projects/{id}/scripts/{name}/bench/cases`. 404 → empty list.
148    pub async fn list_cases(&self, script_name: &str) -> Result<Vec<BenchCase>> {
149        let url = format!("{}/cases", self.bench_url(script_name));
150        self.c().get_list(&url).await
151    }
152
153    /// `POST /projects/{id}/scripts/{name}/bench/cases` — form-builder create.
154    pub async fn create_case(
155        &self,
156        script_name: &str,
157        req: &CreateBenchCaseRequest,
158    ) -> Result<BenchCase> {
159        let url = format!("{}/cases", self.bench_url(script_name));
160        self.c().post(&url, req).await
161    }
162
163    /// `GET /projects/{id}/scripts/{name}/bench/cases/contract-drift`.
164    pub async fn case_contract_drift(&self, script_name: &str) -> Result<DriftReport> {
165        let url = format!("{}/cases/contract-drift", self.bench_url(script_name));
166        Ok(self
167            .c()
168            .get_opt::<DriftReport>(&url)
169            .await?
170            .unwrap_or(DriftReport {
171                drifted: Vec::new(),
172                script_version_id: None,
173                published_at: None,
174                published_by: None,
175                summary: String::new(),
176            }))
177    }
178
179    // ── Runs (project-scoped surface) ───────────────────────────────────────
180
181    /// `GET /projects/{id}/scripts/{name}/bench/runs` — paginated via
182    /// `limit`/`offset`.
183    pub async fn list_runs(
184        &self,
185        script_name: &str,
186        limit: Option<i64>,
187        offset: Option<i64>,
188    ) -> Result<Vec<BenchRun>> {
189        #[derive(Serialize)]
190        struct Q {
191            #[serde(skip_serializing_if = "Option::is_none")]
192            limit: Option<i64>,
193            #[serde(skip_serializing_if = "Option::is_none")]
194            offset: Option<i64>,
195        }
196        let base = format!("{}/runs", self.bench_url(script_name));
197        let url = AkribesClient::url_with_query(&base, &Q { limit, offset });
198        self.c().get_list(&url).await
199    }
200
201    /// `POST /projects/{id}/scripts/{name}/bench/runs` — trigger a run.
202    /// `case_ids` constrains the fan-out to a subset (partial run).
203    pub async fn trigger_run(
204        &self,
205        script_name: &str,
206        req: &TriggerBenchRunRequest,
207    ) -> Result<BenchRun> {
208        let url = format!("{}/runs", self.bench_url(script_name));
209        self.c().post(&url, req).await
210    }
211}
212
213// ── Run-scoped (cross-project) client ────────────────────────────────────────
214
215/// Operations keyed on a global `bench_runs.id`. These endpoints live under
216/// `/bench-runs/{id}/...` and don't need a project scope (the server resolves
217/// the owning project from the run row). Obtained via `client.bench_runs()`.
218#[derive(Clone, Debug)]
219pub struct BenchRunsClient {
220    pub(crate) inner: Arc<Inner>,
221}
222
223impl BenchRunsClient {
224    pub(crate) fn new(inner: Arc<Inner>) -> Self {
225        Self { inner }
226    }
227
228    fn c(&self) -> AkribesClient {
229        AkribesClient {
230            inner: Arc::clone(&self.inner),
231        }
232    }
233
234    fn run_url(&self, run_id: i64) -> String {
235        format!("{}/bench-runs/{}", self.inner.base_url, run_id)
236    }
237
238    /// `GET /bench-runs/{id}` — 404 → `Ok(None)`.
239    pub async fn get(&self, run_id: i64) -> Result<Option<BenchRun>> {
240        self.c().get_opt(&self.run_url(run_id)).await
241    }
242
243    /// `DELETE /bench-runs/{id}` — returns `()` (server emits 204 No Content).
244    /// Cancels the run first (best-effort) before dropping the row.
245    pub async fn delete(&self, run_id: i64) -> Result<()> {
246        // `AkribesClient::delete` swallows the body and reports a bool
247        // (deleted vs already-absent). The bench delete endpoint emits 204,
248        // which is "deleted" — we discard the bool to give consumers a clean
249        // `()` return.
250        self.c().delete(&self.run_url(run_id)).await?;
251        Ok(())
252    }
253
254    /// `GET /bench-runs/{id}/results`. 404 → empty list.
255    pub async fn list_results(&self, run_id: i64) -> Result<Vec<BenchResult>> {
256        let url = format!("{}/results", self.run_url(run_id));
257        self.c().get_list(&url).await
258    }
259
260    /// Subscribe to a bench run's **live** result stream over SSE
261    /// (`GET /bench-runs/{id}/events`, `Accept: text/event-stream`).
262    ///
263    /// Returns a receiver of typed [`BenchRunEvent`]s plus an
264    /// [`EventSubscription`] handle that cancels the background listener
265    /// on drop. The server emits a [`BenchRunEvent::Result`] per recorded
266    /// case, a [`BenchRunEvent::Lagged`] if the broadcast subscriber falls
267    /// behind, and a final [`BenchRunEvent::Terminal`] carrying the run's
268    /// terminal status — after which the channel closes.
269    ///
270    /// This awaits the SSE subscription being live on the server before
271    /// returning, so a non-2xx (e.g. 403 on a project the token can't
272    /// read, or the run id not resolving to a project) surfaces here
273    /// rather than as a silently-empty stream. Reuses the crate's shared
274    /// SSE byte-deframer and field parser (the same machinery behind
275    /// `events().event_stream` / `executions().run_stream`).
276    ///
277    /// Mirrors the TS SDK's `subscribeRunEvents`; the `terminal` frame is
278    /// surfaced as a typed variant so Rust callers can detect
279    /// end-of-stream without a side channel. Drop the returned
280    /// `EventSubscription` (or let it fall out of scope) to unsubscribe.
281    ///
282    /// ```no_run
283    /// # use akribes_sdk::AkribesClient;
284    /// # use akribes_sdk::models::BenchRunEvent;
285    /// # async fn example(client: AkribesClient) -> akribes_sdk::Result<()> {
286    /// let (mut rx, _sub) = client.bench_runs().subscribe_run_events(42).await?;
287    /// while let Some(evt) = rx.recv().await {
288    ///     match evt {
289    ///         BenchRunEvent::Result(r) => println!("case {} → {}", r.case_id, r.status),
290    ///         BenchRunEvent::Lagged { dropped } => eprintln!("dropped {dropped}"),
291    ///         BenchRunEvent::Terminal { status } => {
292    ///             println!("run ended: {status}");
293    ///             break;
294    ///         }
295    ///     }
296    /// }
297    /// # Ok(()) }
298    /// ```
299    pub async fn subscribe_run_events(
300        &self,
301        run_id: i64,
302    ) -> Result<(mpsc::UnboundedReceiver<BenchRunEvent>, EventSubscription)> {
303        let (tx, rx) = mpsc::unbounded_channel();
304        let (ready_tx, ready_rx) = oneshot::channel::<Result<()>>();
305        let http = self.inner.http.clone();
306        let token = self.inner.token.clone();
307        let base_url = self.inner.base_url.clone();
308
309        let handle = tokio::spawn(async move {
310            let _ =
311                stream_bench_run_events(http, token, base_url, run_id, tx, Some(ready_tx)).await;
312        });
313
314        match ready_rx.await {
315            Ok(Ok(())) => Ok((rx, EventSubscription::from_handle(handle))),
316            Ok(Err(e)) => {
317                handle.abort();
318                Err(e)
319            }
320            Err(_) => {
321                handle.abort();
322                Err(AkribesError::Other(
323                    "bench SSE listener died before subscription was confirmed".into(),
324                ))
325            }
326        }
327    }
328
329    /// `POST /bench-runs/{id}/cancel`. Flips the cancel token; in-flight cases
330    /// complete naturally. Returns the run row as it stands.
331    pub async fn cancel(&self, run_id: i64) -> Result<BenchRun> {
332        let url = format!("{}/cancel", self.run_url(run_id));
333        let empty: serde_json::Value = serde_json::json!({});
334        self.c().post(&url, &empty).await
335    }
336
337    /// `PATCH /bench-runs/{id}/tag-session` — attribute the run to an MCP
338    /// session id so the coordinator's finalize step posts the cost into
339    /// `mcp_session_cost`.
340    pub async fn tag_session(
341        &self,
342        run_id: i64,
343        mcp_session_id: &str,
344    ) -> Result<BenchRunTagSessionResponse> {
345        #[derive(Serialize)]
346        struct Body<'a> {
347            mcp_session_id: &'a str,
348        }
349        let url = format!("{}/tag-session", self.run_url(run_id));
350        self.c().patch(&url, &Body { mcp_session_id }).await
351    }
352
353    /// `POST /executions/{exec_id}/promote-to-case` — promote a completed
354    /// execution into a bench case, with optional `edits` overlay.
355    ///
356    /// This is run-scoped only in the loose sense: it lives on `/executions`
357    /// rather than `/bench-runs`, but it's the natural counterpart to the
358    /// "promote-from-execution" flow on the bench surface and doesn't need a
359    /// project_id (the server resolves the owning project from the source
360    /// execution).
361    pub async fn promote_execution(
362        &self,
363        execution_id: &str,
364        req: &PromoteExecutionRequest,
365    ) -> Result<BenchCase> {
366        let url = format!(
367            "{}/executions/{}/promote-to-case",
368            self.inner.base_url,
369            urlencoding::encode(execution_id),
370        );
371        self.c().post(&url, req).await
372    }
373
374    /// `GET /bench-runs/{a}/compare/{b}` — diff two runs of the same bench.
375    pub async fn compare(&self, run_a: i64, run_b: i64) -> Result<CompareReport> {
376        let url = format!(
377            "{}/bench-runs/{}/compare/{}",
378            self.inner.base_url, run_a, run_b,
379        );
380        self.c()
381            .get_opt::<CompareReport>(&url)
382            .await?
383            .ok_or_else(|| crate::error::AkribesError::HttpStatus {
384                status: 404,
385                message: format!("compare runs {}↔{} returned 404", run_a, run_b),
386            })
387    }
388
389    // ── Case-id keyed operations ────────────────────────────────────────────
390    //
391    // `PATCH /cases/{id}` and `DELETE /cases/{id}` live under `/cases` (no
392    // project scope) — same naming as `/bench-runs/{id}`. Surface them on
393    // the same global client.
394
395    /// `GET /executions/{case_id}` — fetch a single case (cases are
396    /// `executions` rows with `kind='case'`). Returned as `Value` for
397    /// compatibility with the MCP tool, which doesn't trust the row to
398    /// always type-check as `BenchCase` (legacy promoted-execution rows can
399    /// have null kind on older servers).
400    pub async fn get_case(&self, case_id: &str) -> Result<serde_json::Value> {
401        let url = format!(
402            "{}/executions/{}",
403            self.inner.base_url,
404            urlencoding::encode(case_id),
405        );
406        Ok(self
407            .c()
408            .get_opt::<serde_json::Value>(&url)
409            .await?
410            .unwrap_or(serde_json::Value::Null))
411    }
412
413    /// `PATCH /cases/{id}` — sparse update.
414    pub async fn patch_case(
415        &self,
416        case_id: &str,
417        req: &PatchBenchCaseRequest,
418    ) -> Result<BenchCase> {
419        let url = format!(
420            "{}/cases/{}",
421            self.inner.base_url,
422            urlencoding::encode(case_id),
423        );
424        self.c().patch(&url, req).await
425    }
426
427    /// `DELETE /cases/{id}`. The server emits a `{"deleted": true}` JSON body;
428    /// we discard it and return `()`.
429    pub async fn delete_case(&self, case_id: &str) -> Result<()> {
430        let url = format!(
431            "{}/cases/{}",
432            self.inner.base_url,
433            urlencoding::encode(case_id),
434        );
435        self.c().delete(&url).await?;
436        Ok(())
437    }
438
439    /// `GET /benches/{id}` — fast bench-by-id lookup. Returns the bench
440    /// row joined with the owning `project_id` + `script_name` so the
441    /// caller can chain into list_cases / list_runs without an N+1
442    /// project walk. 404 → `Ok(None)`.
443    pub async fn bench_by_id(&self, bench_id: i64) -> Result<Option<serde_json::Value>> {
444        let url = format!("{}/benches/{}", self.inner.base_url, bench_id);
445        self.c().get_json_value_opt(&url).await
446    }
447
448    /// `GET /mcp-sessions/{id}/cost` — aggregated cost for an MCP
449    /// session. Returns `{session_id, total_cost_usd, breakdown}`.
450    /// Lets the MCP server (and any other client) read accumulated
451    /// cost via HTTP rather than querying the `mcp_session_cost`
452    /// table directly.
453    pub async fn mcp_session_cost(&self, session_id: &str) -> Result<serde_json::Value> {
454        let url = format!(
455            "{}/mcp-sessions/{}/cost",
456            self.inner.base_url,
457            urlencoding::encode(session_id),
458        );
459        self.c().get_json_value(&url).await
460    }
461}