akribes_sdk/client.rs
1/// The main Akribes SDK client.
2use std::collections::{HashMap, HashSet};
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use serde::Serialize;
8use tokio::sync::RwLock;
9
10use crate::error::{AkribesError, Result};
11
12// ── Shared state ────────────────────────────────────────────────────────────
13
14#[derive(Debug)]
15pub(crate) struct Inner {
16 pub(crate) base_url: String,
17 pub(crate) project_id: Option<i64>,
18 pub(crate) name: String,
19 pub(crate) id: String,
20 pub(crate) http: reqwest::Client,
21 pub(crate) token: Arc<RwLock<Option<String>>>,
22 /// Optional `X-Akribes-User` header value for metrics attribution. Set
23 /// when a backend with a service token wants per-end-user accounting.
24 /// Advisory only — does not grant any permission.
25 pub(crate) on_behalf_of: Arc<RwLock<Option<String>>>,
26 pub(crate) heartbeat_handle: Mutex<Option<tokio::task::AbortHandle>>,
27 /// Set to `true` when the client is being dropped / destroyed so the
28 /// heartbeat task can stop cleanly without a TOCTOU race.
29 pub(crate) shutdown: Arc<AtomicBool>,
30 /// Cached input schemas per script (populated by init()).
31 pub(crate) schema_cache: Mutex<HashMap<String, Vec<(String, String)>>>,
32 /// Scripts whose schema has changed since init (marked by SSE events).
33 pub(crate) broken_scripts: Mutex<HashSet<String>>,
34 /// Maximum time `documents().ingest()` will keep polling a still-converting
35 /// blob before surfacing `AkribesError::Transient`. See
36 /// [`AkribesClientBuilder::ingest_poll_timeout`].
37 pub(crate) ingest_poll_timeout: Duration,
38}
39
40/// Default request timeout for the internal HTTP client. Individual requests
41/// can still exceed this via their own `reqwest::RequestBuilder::timeout`.
42const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
43/// Default TCP/TLS connect timeout for the internal HTTP client.
44const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
45
46/// Default poll budget for [`crate::sub::documents::DocumentsClient::ingest`].
47///
48/// Multi-page real-world PDFs going through the VLM conversion path routinely
49/// take 1–5 minutes server-side; the previous 30 s ceiling forced every SDK
50/// consumer to handle `AkribesError::Transient` themselves with their own retry
51/// loop. 300 s (5 min) covers the long tail comfortably without making a
52/// server-side hang invisible.
53///
54/// Override per-client with [`AkribesClientBuilder::ingest_poll_timeout`], or
55/// set `AKRIBES_SDK_INGEST_TIMEOUT_SECS` in the process environment for a global
56/// override that takes effect at client construction. The builder param wins
57/// over the env var.
58pub(crate) const DEFAULT_INGEST_POLL_TIMEOUT_SECS: u64 = 300;
59
60/// Read `AKRIBES_SDK_INGEST_TIMEOUT_SECS` from the environment.
61///
62/// Returns `None` if unset or unparseable (in which case the caller falls back
63/// to [`DEFAULT_INGEST_POLL_TIMEOUT_SECS`]). A value of `0` is rejected too —
64/// "ingest with zero deadline" is never the user's intent and would surface as
65/// an immediate `Transient` on the first poll iteration; misconfiguration
66/// should fall back to the default rather than silently break ingest.
67pub(crate) fn ingest_poll_timeout_from_env() -> Option<Duration> {
68 let raw = std::env::var("AKRIBES_SDK_INGEST_TIMEOUT_SECS").ok()?;
69 match raw.trim().parse::<u64>() {
70 Ok(0) => None,
71 Ok(n) => Some(Duration::from_secs(n)),
72 Err(_) => None,
73 }
74}
75
76/// Build the SDK's default `reqwest::Client` with sensible timeouts so a hung
77/// server can never deadlock the caller. Falls back to `Client::new()` if the
78/// builder somehow fails (shouldn't happen with these static settings).
79fn default_http_client() -> reqwest::Client {
80 reqwest::Client::builder()
81 .timeout(DEFAULT_REQUEST_TIMEOUT)
82 .connect_timeout(DEFAULT_CONNECT_TIMEOUT)
83 .build()
84 .unwrap_or_else(|_| reqwest::Client::new())
85}
86
87/// Cap on bytes read from an error response body (64 KiB). A misbehaving
88/// or malicious server can return arbitrarily large error bodies, and
89/// buffering them all with `.text()` would let a single bad response
90/// OOM the SDK consumer.
91const MAX_ERROR_BODY_BYTES: usize = 64 * 1024;
92
93/// Cap on bytes quoted back in a decode-failure error. Short enough to fit in
94/// a single MCP tool response; long enough to eyeball the mismatch.
95const DECODE_ERROR_SNIPPET_BYTES: usize = 512;
96
97/// Read a response body lossily, capped at [`MAX_ERROR_BODY_BYTES`].
98/// If the cap is hit, a trailing `… (truncated)` marker is appended.
99async fn read_body_capped(res: reqwest::Response) -> String {
100 use futures::StreamExt;
101 let mut buf: Vec<u8> = Vec::new();
102 let mut truncated = false;
103 let mut stream = res.bytes_stream();
104 while let Some(chunk) = stream.next().await {
105 let Ok(chunk) = chunk else { break };
106 let remaining = MAX_ERROR_BODY_BYTES.saturating_sub(buf.len());
107 if remaining == 0 {
108 truncated = true;
109 break;
110 }
111 let take = chunk.len().min(remaining);
112 buf.extend_from_slice(&chunk[..take]);
113 if take < chunk.len() {
114 truncated = true;
115 break;
116 }
117 }
118 let mut s = String::from_utf8_lossy(&buf).into_owned();
119 if truncated {
120 s.push_str("… (truncated)");
121 }
122 s
123}
124
125/// Produce a short UTF-8-lossy snippet of a response body for error messages.
126fn body_snippet(bytes: &[u8]) -> String {
127 let total = bytes.len();
128 let cut = total.min(DECODE_ERROR_SNIPPET_BYTES);
129 let s: String = String::from_utf8_lossy(&bytes[..cut]).into_owned();
130 if total > cut {
131 format!("{s}… (truncated, {total} bytes total)")
132 } else {
133 s
134 }
135}
136
137/// Buffer a response body and deserialize it as JSON. On decode failure,
138/// returns an [`AkribesError::Other`] that names the target type, the URL,
139/// and a short snippet of the actual body — so callers don't just see
140/// reqwest's opaque "error decoding response body".
141///
142/// 404 responses that reach this helper are converted to [`AkribesError::HttpStatus`]
143/// rather than attempting to decode them as the happy-path body. Callers that
144/// want to treat 404 as absent (GET-by-id, list) should check status *before*
145/// calling this helper — see [`AkribesClient::get_opt`] for the pattern.
146pub(crate) async fn decode_json<T: serde::de::DeserializeOwned>(
147 res: reqwest::Response,
148) -> Result<T> {
149 let status = res.status();
150 let url = res.url().to_string();
151 if status == reqwest::StatusCode::NOT_FOUND {
152 let body = read_body_capped(res).await;
153 let message = if body.trim().is_empty() {
154 format!("HTTP 404 Not Found (url: {url})")
155 } else {
156 format!("HTTP 404 (url: {url}): {body}")
157 };
158 return Err(AkribesError::HttpStatus {
159 status: 404,
160 message,
161 });
162 }
163 let bytes = res.bytes().await?;
164 serde_json::from_slice::<T>(&bytes).map_err(|e| {
165 AkribesError::Other(format!(
166 "failed to decode response from {url} as {ty}: {e}; body: {snippet}",
167 ty = std::any::type_name::<T>(),
168 snippet = body_snippet(&bytes),
169 ))
170 })
171}
172
173// ── AkribesClient ──────────────────────────────────────────────────────────────
174
175/// Typed client for the Akribes workflow platform.
176///
177/// Cheaply cloneable — all clones share the same HTTP client, auth token, and
178/// heartbeat task.
179///
180/// # Authentication
181///
182/// akribes-server uses two token types. **Pick one** for the `token` field:
183///
184/// **1. Service token** (long-lived, env var, full-Admin within scope) — for
185/// trusted backends. The secret is the part after `:` in
186/// `AKRIBES_SERVICE_TOKEN_<NAME>=<scope>:<secret>` from the server's env.
187///
188/// ```no_run
189/// use akribes_sdk::AkribesClient;
190/// let client = AkribesClient::builder("https://akribes.example.com")
191/// .project_id(2)
192/// .token(std::env::var("AKRIBES_SERVICE_TOKEN").unwrap())
193/// .build();
194/// ```
195///
196/// **2. Scoped token** (`akribes_tk_...` — legacy `aura_tk_...` still
197/// accepted — expires, revokable) — for browsers,
198/// CLIs, or anything you don't want to give a long-lived secret. Mint one via
199/// [`AkribesClient::tokens`]'s [`mint`](crate::sub::tokens::TokensClient::mint):
200///
201/// ```no_run
202/// # use akribes_sdk::{AkribesClient, models::{MintTokenRequest, TokenScopes, ProjectScope, TokenRole, WildcardMarker}};
203/// # async fn ex(backend: AkribesClient) -> Result<(), Box<dyn std::error::Error>> {
204/// // From a backend holding a service token, mint a scoped token for a user:
205/// let minted = backend.tokens().mint(&MintTokenRequest {
206/// user_email: Some("alice@acme.com".to_string()),
207/// scopes: TokenScopes {
208/// projects: ProjectScope::Wildcard(WildcardMarker),
209/// role: TokenRole::Admin,
210/// scripts: None,
211/// executions: None,
212/// can_mint: false,
213/// features: vec![],
214/// org_id: None,
215/// },
216/// expires_in: 8 * 3600, // 8h browser session
217/// label: "web-session".to_string(),
218/// }).await?;
219/// // Ship `minted.token` to the browser.
220/// # Ok(()) }
221/// ```
222///
223/// Set `X-Akribes-User` for metrics attribution when a backend acts on behalf
224/// of a user (advisory header — does not grant permission) via
225/// [`AkribesClientBuilder::on_behalf_of`] at construction or
226/// [`AkribesClient::set_on_behalf_of`] at runtime. Servers also accept the
227/// legacy `X-Aura-User` form for backwards compat with pre-rebrand clients.
228#[derive(Clone, Debug)]
229pub struct AkribesClient {
230 pub(crate) inner: Arc<Inner>,
231}
232
233impl AkribesClient {
234 /// Create a new project-scoped client.
235 ///
236 /// Deprecated: prefer [`AkribesClient::builder`], which is now the blessed
237 /// constructor and supports a wider range of configurations (no project,
238 /// custom http client, etc.).
239 #[deprecated(since = "0.4.0", note = "use AkribesClient::builder(base_url) instead")]
240 pub fn new(base_url: &str, project_id: i64, name: &str, id: &str) -> Self {
241 Self {
242 inner: Arc::new(Inner {
243 base_url: base_url.trim_end_matches('/').to_string(),
244 project_id: Some(project_id),
245 name: name.to_string(),
246 id: id.to_string(),
247 http: default_http_client(),
248 token: Arc::new(RwLock::new(None)),
249 on_behalf_of: Arc::new(RwLock::new(None)),
250 heartbeat_handle: Mutex::new(None),
251 shutdown: Arc::new(AtomicBool::new(false)),
252 schema_cache: Mutex::new(HashMap::new()),
253 broken_scripts: Mutex::new(HashSet::new()),
254 ingest_poll_timeout: ingest_poll_timeout_from_env()
255 .unwrap_or(Duration::from_secs(DEFAULT_INGEST_POLL_TIMEOUT_SECS)),
256 }),
257 }
258 }
259
260 /// Create a builder for more control over client construction.
261 ///
262 /// ```no_run
263 /// # use akribes_sdk::AkribesClient;
264 /// let client = AkribesClient::builder("http://localhost:3001")
265 /// .project_id(1)
266 /// .name("my-service")
267 /// .token("aura_abc123")
268 /// .build();
269 /// ```
270 pub fn builder(base_url: impl Into<String>) -> AkribesClientBuilder {
271 AkribesClientBuilder {
272 base_url: base_url.into().trim_end_matches('/').to_string(),
273 project_id: None,
274 name: None,
275 id: None,
276 token: None,
277 on_behalf_of: None,
278 http_client: None,
279 ingest_poll_timeout: None,
280 }
281 }
282
283 /// Update the authentication token at runtime.
284 ///
285 /// Pass `None` to clear the token (requests will be unauthenticated).
286 pub async fn set_token(&self, token: Option<String>) {
287 *self.inner.token.write().await = token;
288 }
289
290 /// Set or clear the `X-Akribes-User` header sent on every outbound
291 /// request, used by the server for metrics attribution when a backend
292 /// (typically holding a service token) acts on behalf of an end user.
293 ///
294 /// **This header is advisory — it does not grant any permission.**
295 /// Authorization remains based on the bearer token's scope. Servers also
296 /// honor the legacy `X-Aura-User` form for backwards compat with
297 /// pre-rebrand clients, but new code should not rely on that.
298 ///
299 /// Mirrors [`AkribesClientBuilder::on_behalf_of`] for runtime updates
300 /// (e.g. when the same long-lived client services many users).
301 pub async fn set_on_behalf_of(&self, email: Option<String>) {
302 *self.inner.on_behalf_of.write().await = email;
303 }
304
305 /// Return the configured project ID, or `None` if this is a global client.
306 pub fn project_id(&self) -> Option<i64> {
307 self.inner.project_id
308 }
309
310 /// Return the base URL this client points at (no trailing slash). Useful
311 /// for callers that need to compose a URL outside the typed sub-clients —
312 /// e.g. the MCP bench tools poll an SSE endpoint as a plain GET to keep
313 /// their wire-shape contract intact.
314 pub fn base_url(&self) -> &str {
315 &self.inner.base_url
316 }
317
318 /// Authenticated raw GET that returns the response body as a generic
319 /// `serde_json::Value`. Goes through the same auth + telemetry pipeline
320 /// as every typed call; only the deserialisation step is generic.
321 ///
322 /// 404 surfaces as [`AkribesError::HttpStatus`] (the typed callers convert
323 /// it to `Ok(None)` via their own wrappers — for the raw form we keep the
324 /// status visible so callers can branch on absence vs. unrelated failures).
325 pub async fn get_json_value(&self, url: &str) -> Result<serde_json::Value> {
326 let res = self.send(self.inner.http.get(url)).await?;
327 if res.status() == reqwest::StatusCode::NOT_FOUND {
328 return Err(AkribesError::HttpStatus {
329 status: 404,
330 message: format!("GET {url} returned 404"),
331 });
332 }
333 decode_json(res).await
334 }
335
336 /// 404-tolerant variant of [`AkribesClient::get_json_value`]. Returns
337 /// `Ok(None)` instead of surfacing the 404 as an error — useful when
338 /// the caller's notion of "absence" is a legitimate response.
339 pub async fn get_json_value_opt(&self, url: &str) -> Result<Option<serde_json::Value>> {
340 let res = self.send(self.inner.http.get(url)).await?;
341 if res.status() == reqwest::StatusCode::NOT_FOUND {
342 return Ok(None);
343 }
344 Ok(Some(decode_json(res).await?))
345 }
346
347 /// Authenticated raw POST with a JSON body, returning the response body
348 /// as a generic `serde_json::Value`. Companion to
349 /// [`AkribesClient::get_json_value`] for endpoints whose response shape
350 /// isn't (yet) typed in [`crate::models`].
351 pub async fn post_json_value(
352 &self,
353 url: &str,
354 body: &serde_json::Value,
355 ) -> Result<serde_json::Value> {
356 let res = self.send(self.inner.http.post(url).json(body)).await?;
357 if res.status().as_u16() == 204 || res.content_length() == Some(0) {
358 return Ok(serde_json::Value::Null);
359 }
360 decode_json(res).await
361 }
362
363 /// Return the configured `documents().ingest()` poll timeout. Useful for
364 /// inspecting the resolved value (builder override → env var → default)
365 /// from tests or diagnostics.
366 pub fn ingest_poll_timeout(&self) -> Duration {
367 self.inner.ingest_poll_timeout
368 }
369
370 // ── Internal helpers ─────────────────────────────────────────────────────
371
372 /// Build a request with the current auth token + on-behalf-of header
373 /// injected.
374 pub(crate) async fn authed(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
375 let token_guard = self.inner.token.read().await;
376 let mut builder = match token_guard.as_deref() {
377 Some(t) => builder.bearer_auth(t),
378 None => builder,
379 };
380 drop(token_guard);
381 let obo_guard = self.inner.on_behalf_of.read().await;
382 if let Some(email) = obo_guard.as_deref() {
383 // Servers also honor the legacy `X-Aura-User` form for backwards
384 // compat with pre-rebrand clients, but new clients emit the
385 // Akribes form. Header is advisory only — authz comes from the
386 // bearer token's scope.
387 builder = builder.header("X-Akribes-User", email);
388 }
389 builder
390 }
391
392 /// Send a request, classifying 4xx/5xx status codes into typed errors.
393 pub(crate) async fn send(&self, req: reqwest::RequestBuilder) -> Result<reqwest::Response> {
394 let req = self.authed(req).await;
395 let res = req.send().await?;
396 let status = res.status();
397
398 if status == reqwest::StatusCode::UNAUTHORIZED || status == reqwest::StatusCode::FORBIDDEN {
399 let body = read_body_capped(res).await;
400 let message = if body.trim().is_empty() {
401 format!(
402 "HTTP {} {}",
403 status.as_u16(),
404 status.canonical_reason().unwrap_or("")
405 )
406 } else {
407 format!("HTTP {}: {}", status.as_u16(), body)
408 };
409 return Err(AkribesError::Fatal {
410 message,
411 execution_id: None,
412 });
413 }
414
415 // #1296: all 4 transient 5xx statuses get the same dispatch path —
416 // the per-status retry semantics are exposed by the new
417 // `status` field on `AkribesError::Transient` so callers can
418 // pick a base backoff via `AkribesError::recommended_backoff_ms`.
419 if status.as_u16() == 429
420 || status == reqwest::StatusCode::INTERNAL_SERVER_ERROR
421 || status == reqwest::StatusCode::BAD_GATEWAY
422 || status == reqwest::StatusCode::SERVICE_UNAVAILABLE
423 || status == reqwest::StatusCode::GATEWAY_TIMEOUT
424 {
425 // Parse Retry-After before consuming the response (#1009).
426 // Numeric seconds only; HTTP-date form is ignored to match the
427 // Python SDK's behavior.
428 let retry_after = res
429 .headers()
430 .get(reqwest::header::RETRY_AFTER)
431 .and_then(|v| v.to_str().ok())
432 .and_then(|s| s.trim().parse::<u64>().ok())
433 .map(std::time::Duration::from_secs);
434 let body = read_body_capped(res).await;
435 let message = if body.trim().is_empty() {
436 format!(
437 "HTTP {} {}",
438 status.as_u16(),
439 status.canonical_reason().unwrap_or("")
440 )
441 } else {
442 format!("HTTP {}: {}", status.as_u16(), body)
443 };
444 return Err(AkribesError::Transient {
445 message,
446 execution_id: None,
447 retry_after,
448 status: Some(status.as_u16()),
449 });
450 }
451
452 if !status.is_success() && status != reqwest::StatusCode::NOT_FOUND {
453 let msg = read_body_capped(res).await;
454 if status.as_u16() == 409 {
455 if let Ok(body) = serde_json::from_str::<serde_json::Value>(&msg) {
456 if body.get("error_type").and_then(|v| v.as_str())
457 == Some("suite_already_exists")
458 {
459 if let Some(id) = body.get("existing_suite_id").and_then(|v| v.as_i64()) {
460 return Err(AkribesError::AlreadyExists {
461 message: body
462 .get("error")
463 .and_then(|v| v.as_str())
464 .unwrap_or("already exists")
465 .to_string(),
466 existing_id: id,
467 });
468 }
469 }
470 }
471 }
472 return Err(AkribesError::HttpStatus {
473 status: status.as_u16(),
474 message: msg,
475 });
476 }
477
478 Ok(res)
479 }
480
481 /// `GET` — returns `None` on 404, deserialises body otherwise.
482 pub(crate) async fn get_opt<T: serde::de::DeserializeOwned>(
483 &self,
484 url: &str,
485 ) -> Result<Option<T>> {
486 let res = self.send(self.inner.http.get(url)).await?;
487 if res.status() == reqwest::StatusCode::NOT_FOUND {
488 return Ok(None);
489 }
490 Ok(Some(decode_json(res).await?))
491 }
492
493 /// `GET` — deserialises body as a list, treats 404 as empty.
494 pub(crate) async fn get_list<T: serde::de::DeserializeOwned>(
495 &self,
496 url: &str,
497 ) -> Result<Vec<T>> {
498 Ok(self.get_opt::<Vec<T>>(url).await?.unwrap_or_default())
499 }
500
501 /// Append a serde-serializable query struct to a URL. Fields set to
502 /// `None` are skipped (via `#[serde(skip_serializing_if = "Option::is_none")]`).
503 pub(crate) fn url_with_query<Q: Serialize>(base: &str, q: &Q) -> String {
504 match serde_urlencoded::to_string(q) {
505 Ok(qs) if !qs.is_empty() => format!("{base}?{qs}"),
506 _ => base.to_string(),
507 }
508 }
509
510 /// `POST` with a JSON body — returns deserialised response.
511 pub(crate) async fn post<B: Serialize, T: serde::de::DeserializeOwned>(
512 &self,
513 url: &str,
514 body: &B,
515 ) -> Result<T> {
516 let res = self.send(self.inner.http.post(url).json(body)).await?;
517 decode_json(res).await
518 }
519
520 /// `PATCH` with a JSON body — returns deserialised response.
521 pub(crate) async fn patch<B: Serialize, T: serde::de::DeserializeOwned>(
522 &self,
523 url: &str,
524 body: &B,
525 ) -> Result<T> {
526 let res = self.send(self.inner.http.patch(url).json(body)).await?;
527 decode_json(res).await
528 }
529
530 /// `PATCH` with a JSON body — expects no body in the response.
531 pub(crate) async fn patch_empty<B: Serialize>(&self, url: &str, body: &B) -> Result<()> {
532 self.send(self.inner.http.patch(url).json(body)).await?;
533 Ok(())
534 }
535
536 /// `PUT` with a JSON body — expects no body in the response.
537 pub(crate) async fn put_empty<B: Serialize>(&self, url: &str, body: &B) -> Result<()> {
538 self.send(self.inner.http.put(url).json(body)).await?;
539 Ok(())
540 }
541
542 /// `PUT` with a JSON body — returns deserialised response.
543 pub(crate) async fn put_json<B: Serialize, T: serde::de::DeserializeOwned>(
544 &self,
545 url: &str,
546 body: &B,
547 ) -> Result<T> {
548 let res = self.send(self.inner.http.put(url).json(body)).await?;
549 decode_json(res).await
550 }
551
552 /// `DELETE` — returns `true` if deleted, `false` if already absent (404).
553 pub(crate) async fn delete(&self, url: &str) -> Result<bool> {
554 let res = self.send(self.inner.http.delete(url)).await?;
555 Ok(res.status() != reqwest::StatusCode::NOT_FOUND)
556 }
557
558 /// `DELETE` — returns deserialised response body.
559 pub(crate) async fn delete_json<T: serde::de::DeserializeOwned>(&self, url: &str) -> Result<T> {
560 let res = self.send(self.inner.http.delete(url)).await?;
561 decode_json(res).await
562 }
563
564 /// `POST` with a multipart form body — returns deserialised response.
565 pub(crate) async fn post_multipart<T: serde::de::DeserializeOwned>(
566 &self,
567 url: &str,
568 form: reqwest::multipart::Form,
569 ) -> Result<T> {
570 let res = self.send(self.inner.http.post(url).multipart(form)).await?;
571 decode_json(res).await
572 }
573}
574
575// ── Sub-client accessors ─────────────────────────────────────────────────
576
577impl AkribesClient {
578 /// Project management (list, create, update, delete). Global resource.
579 pub fn projects(&self) -> crate::sub::projects::ProjectsClient {
580 crate::sub::projects::ProjectsClient::new(Arc::clone(&self.inner))
581 }
582
583 /// Script execution. Global methods only (get, get_output, get_events,
584 /// cancel, resume, document helpers, await_execution).
585 ///
586 /// For project-scoped methods (run, list, cancel_run, run_from,
587 /// run_with_upload, run_with_s3, get_graph, get_cost), use
588 /// [`AkribesClient::project`] first.
589 pub fn executions(&self) -> crate::sub::executions::ExecutionsClient {
590 crate::sub::executions::ExecutionsClient::new(Arc::clone(&self.inner))
591 }
592
593 /// Scoped token management (mint, list, revoke). Not project-scoped.
594 pub fn tokens(&self) -> crate::sub::tokens::TokensClient {
595 crate::sub::tokens::TokensClient::new(Arc::clone(&self.inner))
596 }
597
598 /// Document conversion via the server's Docling integration.
599 pub fn convert(&self) -> crate::sub::convert::ConvertClient {
600 crate::sub::convert::ConvertClient::new(Arc::clone(&self.inner))
601 }
602
603 /// Global bench-run operations (anything keyed on the cross-project
604 /// `bench_runs.id`): get/delete a run, list results, page events, cancel,
605 /// tag-session, compare two runs, promote an execution to a case. These
606 /// endpoints live at `/bench-runs/{id}/...` and don't need a project
607 /// scope — the server resolves the owning project from the run row.
608 ///
609 /// Project-scoped bench operations (config CRUD, case CRUD, list/trigger
610 /// runs for a script) live on [`ProjectScope::bench`] instead.
611 pub fn bench_runs(&self) -> crate::sub::bench::BenchRunsClient {
612 crate::sub::bench::BenchRunsClient::new(Arc::clone(&self.inner))
613 }
614
615 /// Enter a project scope. The returned [`ProjectScope`] gives infallible
616 /// access to all project-scoped sub-clients (`scripts`, `drafts`,
617 /// `versions`, `channels`, `bench`, `events`, `registered_clients`).
618 ///
619 /// ```no_run
620 /// # use akribes_sdk::AkribesClient;
621 /// # async fn example(client: AkribesClient) -> akribes_sdk::Result<()> {
622 /// let scripts = client.project(1).scripts().list().await?;
623 /// # Ok(()) }
624 /// ```
625 pub fn project(&self, project_id: i64) -> ProjectScope {
626 ProjectScope {
627 inner: Arc::clone(&self.inner),
628 project_id,
629 }
630 }
631
632 /// Shortcut for clients constructed with [`AkribesClientBuilder::project_id`]:
633 /// returns a [`ProjectScope`] using the embedded project id, or
634 /// `Err(MissingProjectId)` if none was set.
635 pub fn scoped(&self) -> Result<ProjectScope> {
636 let pid = self
637 .inner
638 .project_id
639 .ok_or(AkribesError::MissingProjectId)?;
640 Ok(ProjectScope {
641 inner: Arc::clone(&self.inner),
642 project_id: pid,
643 })
644 }
645
646 /// Fetch the global server state (provider env, etc.).
647 /// Does **not** require `project_id`.
648 pub async fn get_state(&self) -> crate::error::Result<serde_json::Value> {
649 let url = format!("{}/state", self.inner.base_url);
650 Ok(self
651 .get_opt::<serde_json::Value>(&url)
652 .await?
653 .unwrap_or(serde_json::json!({})))
654 }
655
656 /// Fetch the caller's per-user sandbox project id (creates one if missing).
657 /// Use this to subscribe to ad-hoc events *before* calling [`run_adhoc`](Self::run_adhoc)
658 /// so the first engine events aren't missed.
659 pub async fn get_sandbox_project_id(&self) -> crate::error::Result<i64> {
660 let url = format!("{}/me/sandbox", self.inner.base_url);
661 let body: crate::models::SandboxProjectIdResponse =
662 self.send(self.inner.http.get(&url)).await?.json().await?;
663 Ok(body.project_id)
664 }
665
666 /// Execute raw `.akr` source ad-hoc. The server runs it in the caller's
667 /// per-user sandbox project and returns `{execution_id, project_id}`.
668 ///
669 /// Equivalent to [`run_adhoc_with`](Self::run_adhoc_with) with `channel`
670 /// and `triggered_by` both `None`.
671 pub async fn run_adhoc(
672 &self,
673 source: &str,
674 inputs: Option<std::collections::HashMap<String, serde_json::Value>>,
675 breakpoint_lines: Option<Vec<usize>>,
676 ) -> crate::error::Result<crate::models::AdhocRunResult> {
677 self.run_adhoc_with(source, inputs, breakpoint_lines, None, None)
678 .await
679 }
680
681 /// Execute raw `.akr` source ad-hoc, with optional `channel` and
682 /// `triggered_by` (#1120). Mirrors Python's `run_adhoc(channel=...,
683 /// triggered_by=...)` and the TS `runAdHoc` opts.
684 ///
685 /// - `channel`: release channel for resolving `use foo` references. When
686 /// `None`, the server applies its default (typically `production`).
687 /// - `triggered_by`: opaque identifier recorded with the execution for
688 /// audit. Common values: `"studio"`, `"bench"`, `"<user_email>"`.
689 pub async fn run_adhoc_with(
690 &self,
691 source: &str,
692 inputs: Option<std::collections::HashMap<String, serde_json::Value>>,
693 breakpoint_lines: Option<Vec<usize>>,
694 channel: Option<&str>,
695 triggered_by: Option<&str>,
696 ) -> crate::error::Result<crate::models::AdhocRunResult> {
697 let url = format!("{}/execute", self.inner.base_url);
698 self.post(
699 &url,
700 &crate::models::AdhocRunRequest {
701 source,
702 inputs,
703 breakpoint_lines,
704 channel,
705 triggered_by,
706 },
707 )
708 .await
709 }
710
711 /// Subscribe to engine events from ad-hoc executions in the given sandbox
712 /// project. Pass the `project_id` returned from [`run_adhoc`](Self::run_adhoc)
713 /// or [`get_sandbox_project_id`](Self::get_sandbox_project_id).
714 ///
715 /// Returns a receiver for [`EngineEvent`]s (filtered to `Execution` variants)
716 /// plus a subscription handle — dropping the handle cancels the stream.
717 ///
718 /// Equivalent to [`adhoc_event_stream_with_ready`](Self::adhoc_event_stream_with_ready)
719 /// with `ready = None`. Use the `_with_ready` variant when you need to
720 /// avoid the subscribe-after-POST race on a fast workflow.
721 pub async fn adhoc_event_stream(
722 &self,
723 project_id: i64,
724 ) -> crate::error::Result<(
725 tokio::sync::mpsc::UnboundedReceiver<crate::models::EngineEvent>,
726 crate::sub::events::EventSubscription,
727 )> {
728 self.adhoc_event_stream_with_ready(project_id, None).await
729 }
730
731 /// Like [`adhoc_event_stream`](Self::adhoc_event_stream), but takes an
732 /// optional [`tokio::sync::Notify`] that fires once the SSE `GET /events`
733 /// response returns a 2xx status — i.e. the moment the server-side
734 /// broadcast subscriber is attached and no events emitted from this point
735 /// on can be missed.
736 ///
737 /// Use this to avoid the subscribe-after-POST race: a fast workflow
738 /// (single-digit milliseconds, mock providers) can emit `NodeStart`,
739 /// `TaskStart`, … before a naive `run_adhoc().then(adhoc_event_stream)`
740 /// has its SSE subscriber registered, and those opening events are then
741 /// dropped by the broadcast channel.
742 ///
743 /// The pattern is **subscribe → await ready → POST**:
744 ///
745 /// ```no_run
746 /// # use akribes_sdk::AkribesClient;
747 /// # use std::sync::Arc;
748 /// # use tokio::sync::Notify;
749 /// # async fn ex(client: AkribesClient, source: &str) -> akribes_sdk::Result<()> {
750 /// let project_id = client.get_sandbox_project_id().await?;
751 /// let ready = Arc::new(Notify::new());
752 /// let (mut rx, _sub) = client
753 /// .adhoc_event_stream_with_ready(project_id, Some(Arc::clone(&ready)))
754 /// .await?;
755 /// ready.notified().await; // SSE attached, safe to POST
756 /// client.run_adhoc(source, None, None).await?;
757 /// while let Some(_event) = rx.recv().await { /* … */ }
758 /// # Ok(()) }
759 /// ```
760 ///
761 /// If the SSE subscription fails (auth error, server down, all retries
762 /// exhausted) the notify is **never fired** — wrap the wait in a timeout
763 /// or join it with the subscription handle to avoid blocking forever.
764 pub async fn adhoc_event_stream_with_ready(
765 &self,
766 project_id: i64,
767 ready: Option<Arc<tokio::sync::Notify>>,
768 ) -> crate::error::Result<(
769 tokio::sync::mpsc::UnboundedReceiver<crate::models::EngineEvent>,
770 crate::sub::events::EventSubscription,
771 )> {
772 use crate::models::HubEvent;
773 use tokio::sync::oneshot;
774 let (hub_tx, mut hub_rx) = tokio::sync::mpsc::unbounded_channel();
775 let (engine_tx, engine_rx) = tokio::sync::mpsc::unbounded_channel();
776
777 // Bridge the underlying oneshot ready-signal to the caller's Notify.
778 // `stream_sse_with_retry` fires the oneshot exactly once with `Ok(())`
779 // on first 2xx, or `Err(_)` if all retries are exhausted; we only
780 // notify the caller on the success path so a `notified().await` paired
781 // with a timeout still surfaces the failure.
782 let (ready_tx, ready_rx) = oneshot::channel::<crate::error::Result<()>>();
783 if let Some(notify) = ready.clone() {
784 tokio::spawn(async move {
785 if let Ok(Ok(())) = ready_rx.await {
786 notify.notify_one();
787 }
788 });
789 } else {
790 tokio::spawn(async move {
791 let _ = ready_rx.await;
792 });
793 }
794
795 let http = self.inner.http.clone();
796 let token = self.inner.token.clone();
797 let base_url = self.inner.base_url.clone();
798 let sse_handle = tokio::spawn(async move {
799 let _ = crate::sub::events::stream_sse_with_retry(
800 http,
801 token,
802 base_url,
803 project_id,
804 Some("adhoc".to_string()),
805 hub_tx,
806 Some(ready_tx),
807 )
808 .await;
809 });
810
811 let filter_handle = tokio::spawn(async move {
812 while let Some(evt) = hub_rx.recv().await {
813 if let HubEvent::Execution { event, .. } = evt {
814 if engine_tx.send(event).is_err() {
815 break;
816 }
817 }
818 }
819 sse_handle.abort();
820 });
821
822 Ok((
823 engine_rx,
824 crate::sub::events::EventSubscription::from_handle(filter_handle),
825 ))
826 }
827}
828
829// ── Project scope ────────────────────────────────────────────────────────────
830
831/// Project-scoped handle to the server. Obtained from
832/// [`AkribesClient::project`]. Cheap to clone (just an `Arc` and an `i64`).
833#[derive(Clone, Debug)]
834pub struct ProjectScope {
835 pub(crate) inner: Arc<Inner>,
836 pub(crate) project_id: i64,
837}
838
839impl ProjectScope {
840 /// The project ID this scope is bound to.
841 pub fn project_id(&self) -> i64 {
842 self.project_id
843 }
844
845 /// Underlying client (e.g. for global operations alongside scoped ones).
846 pub fn client(&self) -> AkribesClient {
847 AkribesClient {
848 inner: Arc::clone(&self.inner),
849 }
850 }
851
852 /// Script management within this project.
853 pub fn scripts(&self) -> crate::sub::scripts::ScriptsClient {
854 crate::sub::scripts::ScriptsClient::new(Arc::clone(&self.inner), self.project_id)
855 }
856
857 /// Script drafts within this project.
858 pub fn drafts(&self) -> crate::sub::drafts::DraftsClient {
859 crate::sub::drafts::DraftsClient::new(Arc::clone(&self.inner), self.project_id)
860 }
861
862 /// Script versions within this project.
863 pub fn versions(&self) -> crate::sub::versions::VersionsClient {
864 crate::sub::versions::VersionsClient::new(Arc::clone(&self.inner), self.project_id)
865 }
866
867 /// Script channels within this project.
868 pub fn channels(&self) -> crate::sub::channels::ChannelsClient {
869 crate::sub::channels::ChannelsClient::new(Arc::clone(&self.inner), self.project_id)
870 }
871
872 /// Project-scoped bench operations: config CRUD, cases, list/trigger runs
873 /// for a given script. Run-scoped operations (anything keyed on
874 /// `bench_runs.id`) live on [`AkribesClient::bench_runs`] instead.
875 pub fn bench(&self) -> crate::sub::bench::BenchClient {
876 crate::sub::bench::BenchClient::new(Arc::clone(&self.inner), self.project_id)
877 }
878
879 /// SSE event streams scoped to this project.
880 pub fn events(&self) -> crate::sub::events::EventsClient {
881 crate::sub::events::EventsClient::new(Arc::clone(&self.inner), self.project_id)
882 }
883
884 /// Client registration scoped to this project.
885 pub fn registered_clients(&self) -> crate::sub::clients::RegisteredClientsClient {
886 crate::sub::clients::RegisteredClientsClient::new(Arc::clone(&self.inner), self.project_id)
887 }
888
889 /// Project-scoped execution operations (run, list, cancel_run, etc.).
890 pub fn executions(&self) -> crate::sub::executions::ScopedExecutionsClient {
891 crate::sub::executions::ScopedExecutionsClient::new(
892 Arc::clone(&self.inner),
893 self.project_id,
894 )
895 }
896
897 /// MCP server/tool discovery for this project.
898 pub fn mcp(&self) -> crate::sub::mcp::McpClient {
899 crate::sub::mcp::McpClient::new(Arc::clone(&self.inner), self.project_id)
900 }
901
902 /// Document ingest (claim + upload) for this project.
903 pub fn documents(&self) -> crate::sub::documents::DocumentsClient {
904 crate::sub::documents::DocumentsClient::new(Arc::clone(&self.inner), self.project_id)
905 }
906
907 /// Project-scoped document conversion. Uploads go to
908 /// `POST /projects/{id}/convert` so the server can enforce project access.
909 pub async fn convert_file(
910 &self,
911 filename: &str,
912 data: Vec<u8>,
913 ) -> Result<crate::models::ConvertResult> {
914 crate::sub::convert::ConvertClient::new(Arc::clone(&self.inner))
915 .convert_file_for_project(self.project_id, filename, data)
916 .await
917 }
918}
919
920impl Drop for AkribesClient {
921 fn drop(&mut self) {
922 // Signal the heartbeat task to stop. The AtomicBool avoids the
923 // TOCTOU race that `Arc::strong_count` had: a clone could appear
924 // between the count check and the abort.
925 if Arc::strong_count(&self.inner) == 1 {
926 self.inner.shutdown.store(true, Ordering::Release);
927 if let Ok(mut h) = self.inner.heartbeat_handle.lock() {
928 if let Some(handle) = h.take() {
929 handle.abort();
930 }
931 }
932 }
933 }
934}
935
936// ── Builder ─────────────────────────────────────────────────────────────────
937
938#[must_use = "a builder does nothing until .build() is called"]
939pub struct AkribesClientBuilder {
940 base_url: String,
941 project_id: Option<i64>,
942 name: Option<String>,
943 id: Option<String>,
944 token: Option<String>,
945 on_behalf_of: Option<String>,
946 http_client: Option<reqwest::Client>,
947 ingest_poll_timeout: Option<Duration>,
948}
949
950impl AkribesClientBuilder {
951 /// Set the project ID. Required for project-scoped operations (scripts,
952 /// executions, etc.). Omit for global-only usage (list_projects, etc.).
953 pub fn project_id(mut self, project_id: i64) -> Self {
954 self.project_id = Some(project_id);
955 self
956 }
957
958 /// Client display name (default: `"rust-sdk"`).
959 pub fn name(mut self, name: impl Into<String>) -> Self {
960 self.name = Some(name.into());
961 self
962 }
963
964 /// Client ID (default: random UUID v4).
965 pub fn id(mut self, id: impl Into<String>) -> Self {
966 self.id = Some(id.into());
967 self
968 }
969
970 /// Initial authentication token. Either:
971 /// - a **service token** (the secret part of `AKRIBES_SERVICE_TOKEN_<NAME>=<scope>:<secret>`), or
972 /// - a **scoped token** of the form `akribes_tk_<...>` (legacy
973 /// `aura_tk_<...>` still accepted) minted via
974 /// [`crate::sub::tokens::TokensClient::mint`].
975 pub fn token(mut self, token: impl Into<String>) -> Self {
976 self.token = Some(token.into());
977 self
978 }
979
980 /// Set the `X-Akribes-User` header sent on every outbound request.
981 ///
982 /// Used by the server for metrics attribution when a backend (typically
983 /// holding a service token) acts on behalf of an end user. **Advisory
984 /// only — does not grant any permission.** Authorization remains based
985 /// on the bearer token's scope.
986 ///
987 /// Mirrors the TS `AkribesClientOptions.onBehalfOf` and Python
988 /// `AkribesClient(on_behalf_of=...)` knobs. Use
989 /// [`AkribesClient::set_on_behalf_of`] to update the value at runtime.
990 ///
991 /// ```no_run
992 /// # use akribes_sdk::AkribesClient;
993 /// let client = AkribesClient::builder("http://localhost:3001")
994 /// .project_id(2)
995 /// .token(std::env::var("AKRIBES_SERVICE_TOKEN").unwrap())
996 /// .on_behalf_of("alice@acme.com")
997 /// .build();
998 /// ```
999 pub fn on_behalf_of(mut self, email: impl Into<String>) -> Self {
1000 self.on_behalf_of = Some(email.into());
1001 self
1002 }
1003
1004 /// Use a pre-configured [`reqwest::Client`] so multiple `AkribesClient`s
1005 /// can share a connection pool, proxy settings, or TLS configuration.
1006 ///
1007 /// If not called, a default `reqwest::Client` is created with a 60s
1008 /// request timeout and a 10s connect timeout.
1009 pub fn http_client(mut self, client: reqwest::Client) -> Self {
1010 self.http_client = Some(client);
1011 self
1012 }
1013
1014 /// Override the deadline `documents().ingest()` waits for a still-converting
1015 /// blob before surfacing [`AkribesError::Transient`].
1016 ///
1017 /// Resolution order at [`build`](Self::build) time:
1018 /// 1. This builder param if set.
1019 /// 2. `AKRIBES_SDK_INGEST_TIMEOUT_SECS` env var (parsed as `u64` seconds;
1020 /// `0` and unparseable values are ignored).
1021 /// 3. [`DEFAULT_INGEST_POLL_TIMEOUT_SECS`] (300 s).
1022 ///
1023 /// Setting this to a very short duration is occasionally useful in tests
1024 /// that want to assert the timeout path. Setting it absurdly long (hours)
1025 /// just shifts the failure mode — the server has its own conversion
1026 /// timeouts.
1027 pub fn ingest_poll_timeout(mut self, timeout: Duration) -> Self {
1028 self.ingest_poll_timeout = Some(timeout);
1029 self
1030 }
1031
1032 /// Build the client.
1033 pub fn build(self) -> AkribesClient {
1034 let ingest_poll_timeout = self
1035 .ingest_poll_timeout
1036 .or_else(ingest_poll_timeout_from_env)
1037 .unwrap_or(Duration::from_secs(DEFAULT_INGEST_POLL_TIMEOUT_SECS));
1038 AkribesClient {
1039 inner: Arc::new(Inner {
1040 base_url: self.base_url,
1041 project_id: self.project_id,
1042 name: self.name.unwrap_or_else(|| "rust-sdk".to_string()),
1043 id: self.id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
1044 http: self.http_client.unwrap_or_else(default_http_client),
1045 token: Arc::new(RwLock::new(self.token)),
1046 on_behalf_of: Arc::new(RwLock::new(self.on_behalf_of)),
1047 heartbeat_handle: Mutex::new(None),
1048 shutdown: Arc::new(AtomicBool::new(false)),
1049 schema_cache: Mutex::new(HashMap::new()),
1050 broken_scripts: Mutex::new(HashSet::new()),
1051 ingest_poll_timeout,
1052 }),
1053 }
1054 }
1055}