Skip to main content

akribes_sdk/
client.rs

1/// The main Akribes SDK client.
2use std::collections::{HashMap, HashSet};
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use serde::Serialize;
8use tokio::sync::RwLock;
9
10use crate::error::{AkribesError, Result};
11
12// ── Shared state ────────────────────────────────────────────────────────────
13
14#[derive(Debug)]
15pub(crate) struct Inner {
16    pub(crate) base_url: String,
17    pub(crate) project_id: Option<i64>,
18    pub(crate) name: String,
19    pub(crate) id: String,
20    pub(crate) http: reqwest::Client,
21    pub(crate) token: Arc<RwLock<Option<String>>>,
22    /// Optional `X-Akribes-User` header value for metrics attribution. Set
23    /// when a backend with a service token wants per-end-user accounting.
24    /// Advisory only — does not grant any permission.
25    pub(crate) on_behalf_of: Arc<RwLock<Option<String>>>,
26    pub(crate) heartbeat_handle: Mutex<Option<tokio::task::AbortHandle>>,
27    /// Set to `true` when the client is being dropped / destroyed so the
28    /// heartbeat task can stop cleanly without a TOCTOU race.
29    pub(crate) shutdown: Arc<AtomicBool>,
30    /// Cached input schemas per script (populated by init()).
31    pub(crate) schema_cache: Mutex<HashMap<String, Vec<(String, String)>>>,
32    /// Scripts whose schema has changed since init (marked by SSE events).
33    pub(crate) broken_scripts: Mutex<HashSet<String>>,
34    /// Maximum time `documents().ingest()` will keep polling a still-converting
35    /// blob before surfacing `AkribesError::Transient`. See
36    /// [`AkribesClientBuilder::ingest_poll_timeout`].
37    pub(crate) ingest_poll_timeout: Duration,
38}
39
40/// Default request timeout for the internal HTTP client. Individual requests
41/// can still exceed this via their own `reqwest::RequestBuilder::timeout`.
42const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
43/// Default TCP/TLS connect timeout for the internal HTTP client.
44const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
45
46/// Default poll budget for [`crate::sub::documents::DocumentsClient::ingest`].
47///
48/// Multi-page real-world PDFs going through the VLM conversion path routinely
49/// take 1–5 minutes server-side; the previous 30 s ceiling forced every SDK
50/// consumer to handle `AkribesError::Transient` themselves with their own retry
51/// loop. 300 s (5 min) covers the long tail comfortably without making a
52/// server-side hang invisible.
53///
54/// Override per-client with [`AkribesClientBuilder::ingest_poll_timeout`], or
55/// set `AKRIBES_SDK_INGEST_TIMEOUT_SECS` in the process environment for a global
56/// override that takes effect at client construction. The builder param wins
57/// over the env var.
58pub(crate) const DEFAULT_INGEST_POLL_TIMEOUT_SECS: u64 = 300;
59
60/// Read `AKRIBES_SDK_INGEST_TIMEOUT_SECS` from the environment.
61///
62/// Returns `None` if unset or unparseable (in which case the caller falls back
63/// to [`DEFAULT_INGEST_POLL_TIMEOUT_SECS`]). A value of `0` is rejected too —
64/// "ingest with zero deadline" is never the user's intent and would surface as
65/// an immediate `Transient` on the first poll iteration; misconfiguration
66/// should fall back to the default rather than silently break ingest.
67pub(crate) fn ingest_poll_timeout_from_env() -> Option<Duration> {
68    let raw = std::env::var("AKRIBES_SDK_INGEST_TIMEOUT_SECS").ok()?;
69    match raw.trim().parse::<u64>() {
70        Ok(0) => None,
71        Ok(n) => Some(Duration::from_secs(n)),
72        Err(_) => None,
73    }
74}
75
76/// Build the SDK's default `reqwest::Client` with sensible timeouts so a hung
77/// server can never deadlock the caller. Falls back to `Client::new()` if the
78/// builder somehow fails (shouldn't happen with these static settings).
79fn default_http_client() -> reqwest::Client {
80    reqwest::Client::builder()
81        .timeout(DEFAULT_REQUEST_TIMEOUT)
82        .connect_timeout(DEFAULT_CONNECT_TIMEOUT)
83        .build()
84        .unwrap_or_else(|_| reqwest::Client::new())
85}
86
87/// Cap on bytes read from an error response body (64 KiB). A misbehaving
88/// or malicious server can return arbitrarily large error bodies, and
89/// buffering them all with `.text()` would let a single bad response
90/// OOM the SDK consumer.
91const MAX_ERROR_BODY_BYTES: usize = 64 * 1024;
92
93/// Cap on bytes quoted back in a decode-failure error. Short enough to fit in
94/// a single MCP tool response; long enough to eyeball the mismatch.
95const DECODE_ERROR_SNIPPET_BYTES: usize = 512;
96
97/// Read a response body lossily, capped at [`MAX_ERROR_BODY_BYTES`].
98/// If the cap is hit, a trailing `… (truncated)` marker is appended.
99async fn read_body_capped(res: reqwest::Response) -> String {
100    use futures::StreamExt;
101    let mut buf: Vec<u8> = Vec::new();
102    let mut truncated = false;
103    let mut stream = res.bytes_stream();
104    while let Some(chunk) = stream.next().await {
105        let Ok(chunk) = chunk else { break };
106        let remaining = MAX_ERROR_BODY_BYTES.saturating_sub(buf.len());
107        if remaining == 0 {
108            truncated = true;
109            break;
110        }
111        let take = chunk.len().min(remaining);
112        buf.extend_from_slice(&chunk[..take]);
113        if take < chunk.len() {
114            truncated = true;
115            break;
116        }
117    }
118    let mut s = String::from_utf8_lossy(&buf).into_owned();
119    if truncated {
120        s.push_str("… (truncated)");
121    }
122    s
123}
124
125/// Produce a short UTF-8-lossy snippet of a response body for error messages.
126fn body_snippet(bytes: &[u8]) -> String {
127    let total = bytes.len();
128    let cut = total.min(DECODE_ERROR_SNIPPET_BYTES);
129    let s: String = String::from_utf8_lossy(&bytes[..cut]).into_owned();
130    if total > cut {
131        format!("{s}… (truncated, {total} bytes total)")
132    } else {
133        s
134    }
135}
136
137/// Buffer a response body and deserialize it as JSON. On decode failure,
138/// returns an [`AkribesError::Other`] that names the target type, the URL,
139/// and a short snippet of the actual body — so callers don't just see
140/// reqwest's opaque "error decoding response body".
141///
142/// 404 responses that reach this helper are converted to [`AkribesError::HttpStatus`]
143/// rather than attempting to decode them as the happy-path body. Callers that
144/// want to treat 404 as absent (GET-by-id, list) should check status *before*
145/// calling this helper — see [`AkribesClient::get_opt`] for the pattern.
146pub(crate) async fn decode_json<T: serde::de::DeserializeOwned>(
147    res: reqwest::Response,
148) -> Result<T> {
149    let status = res.status();
150    let url = res.url().to_string();
151    if status == reqwest::StatusCode::NOT_FOUND {
152        let body = read_body_capped(res).await;
153        let message = if body.trim().is_empty() {
154            format!("HTTP 404 Not Found (url: {url})")
155        } else {
156            format!("HTTP 404 (url: {url}): {body}")
157        };
158        return Err(AkribesError::HttpStatus {
159            status: 404,
160            message,
161        });
162    }
163    let bytes = res.bytes().await?;
164    serde_json::from_slice::<T>(&bytes).map_err(|e| {
165        AkribesError::Other(format!(
166            "failed to decode response from {url} as {ty}: {e}; body: {snippet}",
167            ty = std::any::type_name::<T>(),
168            snippet = body_snippet(&bytes),
169        ))
170    })
171}
172
173// ── AkribesClient ──────────────────────────────────────────────────────────────
174
175/// Typed client for the Akribes workflow platform.
176///
177/// Cheaply cloneable — all clones share the same HTTP client, auth token, and
178/// heartbeat task.
179///
180/// # Authentication
181///
182/// akribes-server uses two token types. **Pick one** for the `token` field:
183///
184/// **1. Service token** (long-lived, env var, full-Admin within scope) — for
185/// trusted backends. The secret is the part after `:` in
186/// `AKRIBES_SERVICE_TOKEN_<NAME>=<scope>:<secret>` from the server's env.
187///
188/// ```no_run
189/// use akribes_sdk::AkribesClient;
190/// let client = AkribesClient::builder("https://akribes.example.com")
191///     .project_id(2)
192///     .token(std::env::var("AKRIBES_SERVICE_TOKEN").unwrap())
193///     .build();
194/// ```
195///
196/// **2. Scoped token** (`akribes_tk_...` — legacy `aura_tk_...` still
197/// accepted — expires, revokable) — for browsers,
198/// CLIs, or anything you don't want to give a long-lived secret. Mint one via
199/// [`AkribesClient::tokens`]'s [`mint`](crate::sub::tokens::TokensClient::mint):
200///
201/// ```no_run
202/// # use akribes_sdk::{AkribesClient, models::{MintTokenRequest, TokenScopes, ProjectScope, TokenRole, WildcardMarker}};
203/// # async fn ex(backend: AkribesClient) -> Result<(), Box<dyn std::error::Error>> {
204/// // From a backend holding a service token, mint a scoped token for a user:
205/// let minted = backend.tokens().mint(&MintTokenRequest {
206///     user_email: Some("alice@acme.com".to_string()),
207///     scopes: TokenScopes {
208///         projects: ProjectScope::Wildcard(WildcardMarker),
209///         role: TokenRole::Admin,
210///         scripts: None,
211///         executions: None,
212///         can_mint: false,
213///         features: vec![],
214///         org_id: None,
215///     },
216///     expires_in: 8 * 3600, // 8h browser session
217///     label: "web-session".to_string(),
218/// }).await?;
219/// // Ship `minted.token` to the browser.
220/// # Ok(()) }
221/// ```
222///
223/// Set `X-Akribes-User` for metrics attribution when a backend acts on behalf
224/// of a user (advisory header — does not grant permission) via
225/// [`AkribesClientBuilder::on_behalf_of`] at construction or
226/// [`AkribesClient::set_on_behalf_of`] at runtime. Servers also accept the
227/// legacy `X-Aura-User` form for backwards compat with pre-rebrand clients.
228#[derive(Clone, Debug)]
229pub struct AkribesClient {
230    pub(crate) inner: Arc<Inner>,
231}
232
233impl AkribesClient {
234    /// Create a new project-scoped client.
235    ///
236    /// Deprecated: prefer [`AkribesClient::builder`], which is now the blessed
237    /// constructor and supports a wider range of configurations (no project,
238    /// custom http client, etc.).
239    #[deprecated(since = "0.4.0", note = "use AkribesClient::builder(base_url) instead")]
240    pub fn new(base_url: &str, project_id: i64, name: &str, id: &str) -> Self {
241        Self {
242            inner: Arc::new(Inner {
243                base_url: base_url.trim_end_matches('/').to_string(),
244                project_id: Some(project_id),
245                name: name.to_string(),
246                id: id.to_string(),
247                http: default_http_client(),
248                token: Arc::new(RwLock::new(None)),
249                on_behalf_of: Arc::new(RwLock::new(None)),
250                heartbeat_handle: Mutex::new(None),
251                shutdown: Arc::new(AtomicBool::new(false)),
252                schema_cache: Mutex::new(HashMap::new()),
253                broken_scripts: Mutex::new(HashSet::new()),
254                ingest_poll_timeout: ingest_poll_timeout_from_env()
255                    .unwrap_or(Duration::from_secs(DEFAULT_INGEST_POLL_TIMEOUT_SECS)),
256            }),
257        }
258    }
259
260    /// Create a builder for more control over client construction.
261    ///
262    /// ```no_run
263    /// # use akribes_sdk::AkribesClient;
264    /// let client = AkribesClient::builder("http://localhost:3001")
265    ///     .project_id(1)
266    ///     .name("my-service")
267    ///     .token("aura_abc123")
268    ///     .build();
269    /// ```
270    pub fn builder(base_url: impl Into<String>) -> AkribesClientBuilder {
271        AkribesClientBuilder {
272            base_url: base_url.into().trim_end_matches('/').to_string(),
273            project_id: None,
274            name: None,
275            id: None,
276            token: None,
277            on_behalf_of: None,
278            http_client: None,
279            ingest_poll_timeout: None,
280        }
281    }
282
283    /// Update the authentication token at runtime.
284    ///
285    /// Pass `None` to clear the token (requests will be unauthenticated).
286    pub async fn set_token(&self, token: Option<String>) {
287        *self.inner.token.write().await = token;
288    }
289
290    /// Set or clear the `X-Akribes-User` header sent on every outbound
291    /// request, used by the server for metrics attribution when a backend
292    /// (typically holding a service token) acts on behalf of an end user.
293    ///
294    /// **This header is advisory — it does not grant any permission.**
295    /// Authorization remains based on the bearer token's scope. Servers also
296    /// honor the legacy `X-Aura-User` form for backwards compat with
297    /// pre-rebrand clients, but new code should not rely on that.
298    ///
299    /// Mirrors [`AkribesClientBuilder::on_behalf_of`] for runtime updates
300    /// (e.g. when the same long-lived client services many users).
301    pub async fn set_on_behalf_of(&self, email: Option<String>) {
302        *self.inner.on_behalf_of.write().await = email;
303    }
304
305    /// Return the configured project ID, or `None` if this is a global client.
306    pub fn project_id(&self) -> Option<i64> {
307        self.inner.project_id
308    }
309
310    /// Return the base URL this client points at (no trailing slash). Useful
311    /// for callers that need to compose a URL outside the typed sub-clients —
312    /// e.g. the MCP bench tools poll an SSE endpoint as a plain GET to keep
313    /// their wire-shape contract intact.
314    pub fn base_url(&self) -> &str {
315        &self.inner.base_url
316    }
317
318    /// Authenticated raw GET that returns the response body as a generic
319    /// `serde_json::Value`. Goes through the same auth + telemetry pipeline
320    /// as every typed call; only the deserialisation step is generic.
321    ///
322    /// 404 surfaces as [`AkribesError::HttpStatus`] (the typed callers convert
323    /// it to `Ok(None)` via their own wrappers — for the raw form we keep the
324    /// status visible so callers can branch on absence vs. unrelated failures).
325    pub async fn get_json_value(&self, url: &str) -> Result<serde_json::Value> {
326        let res = self.send(self.inner.http.get(url)).await?;
327        if res.status() == reqwest::StatusCode::NOT_FOUND {
328            return Err(AkribesError::HttpStatus {
329                status: 404,
330                message: format!("GET {url} returned 404"),
331            });
332        }
333        decode_json(res).await
334    }
335
336    /// 404-tolerant variant of [`AkribesClient::get_json_value`]. Returns
337    /// `Ok(None)` instead of surfacing the 404 as an error — useful when
338    /// the caller's notion of "absence" is a legitimate response.
339    pub async fn get_json_value_opt(&self, url: &str) -> Result<Option<serde_json::Value>> {
340        let res = self.send(self.inner.http.get(url)).await?;
341        if res.status() == reqwest::StatusCode::NOT_FOUND {
342            return Ok(None);
343        }
344        Ok(Some(decode_json(res).await?))
345    }
346
347    /// Authenticated raw POST with a JSON body, returning the response body
348    /// as a generic `serde_json::Value`. Companion to
349    /// [`AkribesClient::get_json_value`] for endpoints whose response shape
350    /// isn't (yet) typed in [`crate::models`].
351    pub async fn post_json_value(
352        &self,
353        url: &str,
354        body: &serde_json::Value,
355    ) -> Result<serde_json::Value> {
356        let res = self.send(self.inner.http.post(url).json(body)).await?;
357        if res.status().as_u16() == 204 || res.content_length() == Some(0) {
358            return Ok(serde_json::Value::Null);
359        }
360        decode_json(res).await
361    }
362
363    /// Return the configured `documents().ingest()` poll timeout. Useful for
364    /// inspecting the resolved value (builder override → env var → default)
365    /// from tests or diagnostics.
366    pub fn ingest_poll_timeout(&self) -> Duration {
367        self.inner.ingest_poll_timeout
368    }
369
370    // ── Internal helpers ─────────────────────────────────────────────────────
371
372    /// Build a request with the current auth token + on-behalf-of header
373    /// injected.
374    pub(crate) async fn authed(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
375        let token_guard = self.inner.token.read().await;
376        let mut builder = match token_guard.as_deref() {
377            Some(t) => builder.bearer_auth(t),
378            None => builder,
379        };
380        drop(token_guard);
381        let obo_guard = self.inner.on_behalf_of.read().await;
382        if let Some(email) = obo_guard.as_deref() {
383            // Servers also honor the legacy `X-Aura-User` form for backwards
384            // compat with pre-rebrand clients, but new clients emit the
385            // Akribes form. Header is advisory only — authz comes from the
386            // bearer token's scope.
387            builder = builder.header("X-Akribes-User", email);
388        }
389        builder
390    }
391
392    /// Send a request, classifying 4xx/5xx status codes into typed errors.
393    pub(crate) async fn send(&self, req: reqwest::RequestBuilder) -> Result<reqwest::Response> {
394        let req = self.authed(req).await;
395        let res = req.send().await?;
396        let status = res.status();
397
398        if status == reqwest::StatusCode::UNAUTHORIZED || status == reqwest::StatusCode::FORBIDDEN {
399            let body = read_body_capped(res).await;
400            let message = if body.trim().is_empty() {
401                format!(
402                    "HTTP {} {}",
403                    status.as_u16(),
404                    status.canonical_reason().unwrap_or("")
405                )
406            } else {
407                format!("HTTP {}: {}", status.as_u16(), body)
408            };
409            return Err(AkribesError::Fatal {
410                message,
411                execution_id: None,
412            });
413        }
414
415        // #1296: all 4 transient 5xx statuses get the same dispatch path —
416        // the per-status retry semantics are exposed by the new
417        // `status` field on `AkribesError::Transient` so callers can
418        // pick a base backoff via `AkribesError::recommended_backoff_ms`.
419        if status.as_u16() == 429
420            || status == reqwest::StatusCode::INTERNAL_SERVER_ERROR
421            || status == reqwest::StatusCode::BAD_GATEWAY
422            || status == reqwest::StatusCode::SERVICE_UNAVAILABLE
423            || status == reqwest::StatusCode::GATEWAY_TIMEOUT
424        {
425            // Parse Retry-After before consuming the response (#1009).
426            // Numeric seconds only; HTTP-date form is ignored to match the
427            // Python SDK's behavior.
428            let retry_after = res
429                .headers()
430                .get(reqwest::header::RETRY_AFTER)
431                .and_then(|v| v.to_str().ok())
432                .and_then(|s| s.trim().parse::<u64>().ok())
433                .map(std::time::Duration::from_secs);
434            let body = read_body_capped(res).await;
435            let message = if body.trim().is_empty() {
436                format!(
437                    "HTTP {} {}",
438                    status.as_u16(),
439                    status.canonical_reason().unwrap_or("")
440                )
441            } else {
442                format!("HTTP {}: {}", status.as_u16(), body)
443            };
444            return Err(AkribesError::Transient {
445                message,
446                execution_id: None,
447                retry_after,
448                status: Some(status.as_u16()),
449            });
450        }
451
452        if !status.is_success() && status != reqwest::StatusCode::NOT_FOUND {
453            let msg = read_body_capped(res).await;
454            if status.as_u16() == 409 {
455                if let Ok(body) = serde_json::from_str::<serde_json::Value>(&msg) {
456                    if body.get("error_type").and_then(|v| v.as_str())
457                        == Some("suite_already_exists")
458                    {
459                        if let Some(id) = body.get("existing_suite_id").and_then(|v| v.as_i64()) {
460                            return Err(AkribesError::AlreadyExists {
461                                message: body
462                                    .get("error")
463                                    .and_then(|v| v.as_str())
464                                    .unwrap_or("already exists")
465                                    .to_string(),
466                                existing_id: id,
467                            });
468                        }
469                    }
470                }
471            }
472            return Err(AkribesError::HttpStatus {
473                status: status.as_u16(),
474                message: msg,
475            });
476        }
477
478        Ok(res)
479    }
480
481    /// `GET` — returns `None` on 404, deserialises body otherwise.
482    pub(crate) async fn get_opt<T: serde::de::DeserializeOwned>(
483        &self,
484        url: &str,
485    ) -> Result<Option<T>> {
486        let res = self.send(self.inner.http.get(url)).await?;
487        if res.status() == reqwest::StatusCode::NOT_FOUND {
488            return Ok(None);
489        }
490        Ok(Some(decode_json(res).await?))
491    }
492
493    /// `GET` — deserialises body as a list, treats 404 as empty.
494    pub(crate) async fn get_list<T: serde::de::DeserializeOwned>(
495        &self,
496        url: &str,
497    ) -> Result<Vec<T>> {
498        Ok(self.get_opt::<Vec<T>>(url).await?.unwrap_or_default())
499    }
500
501    /// Append a serde-serializable query struct to a URL. Fields set to
502    /// `None` are skipped (via `#[serde(skip_serializing_if = "Option::is_none")]`).
503    pub(crate) fn url_with_query<Q: Serialize>(base: &str, q: &Q) -> String {
504        match serde_urlencoded::to_string(q) {
505            Ok(qs) if !qs.is_empty() => format!("{base}?{qs}"),
506            _ => base.to_string(),
507        }
508    }
509
510    /// `POST` with a JSON body — returns deserialised response.
511    pub(crate) async fn post<B: Serialize, T: serde::de::DeserializeOwned>(
512        &self,
513        url: &str,
514        body: &B,
515    ) -> Result<T> {
516        let res = self.send(self.inner.http.post(url).json(body)).await?;
517        decode_json(res).await
518    }
519
520    /// `PATCH` with a JSON body — returns deserialised response.
521    pub(crate) async fn patch<B: Serialize, T: serde::de::DeserializeOwned>(
522        &self,
523        url: &str,
524        body: &B,
525    ) -> Result<T> {
526        let res = self.send(self.inner.http.patch(url).json(body)).await?;
527        decode_json(res).await
528    }
529
530    /// `PATCH` with a JSON body — expects no body in the response.
531    pub(crate) async fn patch_empty<B: Serialize>(&self, url: &str, body: &B) -> Result<()> {
532        self.send(self.inner.http.patch(url).json(body)).await?;
533        Ok(())
534    }
535
536    /// `PUT` with a JSON body — expects no body in the response.
537    pub(crate) async fn put_empty<B: Serialize>(&self, url: &str, body: &B) -> Result<()> {
538        self.send(self.inner.http.put(url).json(body)).await?;
539        Ok(())
540    }
541
542    /// `PUT` with a JSON body — returns deserialised response.
543    pub(crate) async fn put_json<B: Serialize, T: serde::de::DeserializeOwned>(
544        &self,
545        url: &str,
546        body: &B,
547    ) -> Result<T> {
548        let res = self.send(self.inner.http.put(url).json(body)).await?;
549        decode_json(res).await
550    }
551
552    /// `DELETE` — returns `true` if deleted, `false` if already absent (404).
553    pub(crate) async fn delete(&self, url: &str) -> Result<bool> {
554        let res = self.send(self.inner.http.delete(url)).await?;
555        Ok(res.status() != reqwest::StatusCode::NOT_FOUND)
556    }
557
558    /// `DELETE` — returns deserialised response body.
559    pub(crate) async fn delete_json<T: serde::de::DeserializeOwned>(&self, url: &str) -> Result<T> {
560        let res = self.send(self.inner.http.delete(url)).await?;
561        decode_json(res).await
562    }
563
564    /// `POST` with a multipart form body — returns deserialised response.
565    pub(crate) async fn post_multipart<T: serde::de::DeserializeOwned>(
566        &self,
567        url: &str,
568        form: reqwest::multipart::Form,
569    ) -> Result<T> {
570        let res = self.send(self.inner.http.post(url).multipart(form)).await?;
571        decode_json(res).await
572    }
573}
574
575// ── Sub-client accessors ─────────────────────────────────────────────────
576
577impl AkribesClient {
578    /// Project management (list, create, update, delete). Global resource.
579    pub fn projects(&self) -> crate::sub::projects::ProjectsClient {
580        crate::sub::projects::ProjectsClient::new(Arc::clone(&self.inner))
581    }
582
583    /// Script execution. Global methods only (get, get_output, get_events,
584    /// cancel, resume, document helpers, await_execution).
585    ///
586    /// For project-scoped methods (run, list, cancel_run, run_from,
587    /// run_with_upload, run_with_s3, get_graph, get_cost), use
588    /// [`AkribesClient::project`] first.
589    pub fn executions(&self) -> crate::sub::executions::ExecutionsClient {
590        crate::sub::executions::ExecutionsClient::new(Arc::clone(&self.inner))
591    }
592
593    /// Scoped token management (mint, list, revoke). Not project-scoped.
594    pub fn tokens(&self) -> crate::sub::tokens::TokensClient {
595        crate::sub::tokens::TokensClient::new(Arc::clone(&self.inner))
596    }
597
598    /// Document conversion via the server's Docling integration.
599    pub fn convert(&self) -> crate::sub::convert::ConvertClient {
600        crate::sub::convert::ConvertClient::new(Arc::clone(&self.inner))
601    }
602
603    /// Global bench-run operations (anything keyed on the cross-project
604    /// `bench_runs.id`): get/delete a run, list results, page events, cancel,
605    /// tag-session, compare two runs, promote an execution to a case. These
606    /// endpoints live at `/bench-runs/{id}/...` and don't need a project
607    /// scope — the server resolves the owning project from the run row.
608    ///
609    /// Project-scoped bench operations (config CRUD, case CRUD, list/trigger
610    /// runs for a script) live on [`ProjectScope::bench`] instead.
611    pub fn bench_runs(&self) -> crate::sub::bench::BenchRunsClient {
612        crate::sub::bench::BenchRunsClient::new(Arc::clone(&self.inner))
613    }
614
615    /// Enter a project scope. The returned [`ProjectScope`] gives infallible
616    /// access to all project-scoped sub-clients (`scripts`, `drafts`,
617    /// `versions`, `channels`, `bench`, `events`, `registered_clients`).
618    ///
619    /// ```no_run
620    /// # use akribes_sdk::AkribesClient;
621    /// # async fn example(client: AkribesClient) -> akribes_sdk::Result<()> {
622    /// let scripts = client.project(1).scripts().list().await?;
623    /// # Ok(()) }
624    /// ```
625    pub fn project(&self, project_id: i64) -> ProjectScope {
626        ProjectScope {
627            inner: Arc::clone(&self.inner),
628            project_id,
629        }
630    }
631
632    /// Shortcut for clients constructed with [`AkribesClientBuilder::project_id`]:
633    /// returns a [`ProjectScope`] using the embedded project id, or
634    /// `Err(MissingProjectId)` if none was set.
635    pub fn scoped(&self) -> Result<ProjectScope> {
636        let pid = self
637            .inner
638            .project_id
639            .ok_or(AkribesError::MissingProjectId)?;
640        Ok(ProjectScope {
641            inner: Arc::clone(&self.inner),
642            project_id: pid,
643        })
644    }
645
646    /// Fetch the global server state (provider env, etc.).
647    /// Does **not** require `project_id`.
648    pub async fn get_state(&self) -> crate::error::Result<serde_json::Value> {
649        let url = format!("{}/state", self.inner.base_url);
650        Ok(self
651            .get_opt::<serde_json::Value>(&url)
652            .await?
653            .unwrap_or(serde_json::json!({})))
654    }
655
656    /// Fetch the caller's per-user sandbox project id (creates one if missing).
657    /// Use this to subscribe to ad-hoc events *before* calling [`run_adhoc`](Self::run_adhoc)
658    /// so the first engine events aren't missed.
659    pub async fn get_sandbox_project_id(&self) -> crate::error::Result<i64> {
660        let url = format!("{}/me/sandbox", self.inner.base_url);
661        let body: crate::models::SandboxProjectIdResponse =
662            self.send(self.inner.http.get(&url)).await?.json().await?;
663        Ok(body.project_id)
664    }
665
666    /// Execute raw `.akr` source ad-hoc. The server runs it in the caller's
667    /// per-user sandbox project and returns `{execution_id, project_id}`.
668    ///
669    /// Equivalent to [`run_adhoc_with`](Self::run_adhoc_with) with `channel`
670    /// and `triggered_by` both `None`.
671    pub async fn run_adhoc(
672        &self,
673        source: &str,
674        inputs: Option<std::collections::HashMap<String, serde_json::Value>>,
675        breakpoint_lines: Option<Vec<usize>>,
676    ) -> crate::error::Result<crate::models::AdhocRunResult> {
677        self.run_adhoc_with(source, inputs, breakpoint_lines, None, None)
678            .await
679    }
680
681    /// Execute raw `.akr` source ad-hoc, with optional `channel` and
682    /// `triggered_by` (#1120). Mirrors Python's `run_adhoc(channel=...,
683    /// triggered_by=...)` and the TS `runAdHoc` opts.
684    ///
685    /// - `channel`: release channel for resolving `use foo` references. When
686    ///   `None`, the server applies its default (typically `production`).
687    /// - `triggered_by`: opaque identifier recorded with the execution for
688    ///   audit. Common values: `"studio"`, `"bench"`, `"<user_email>"`.
689    pub async fn run_adhoc_with(
690        &self,
691        source: &str,
692        inputs: Option<std::collections::HashMap<String, serde_json::Value>>,
693        breakpoint_lines: Option<Vec<usize>>,
694        channel: Option<&str>,
695        triggered_by: Option<&str>,
696    ) -> crate::error::Result<crate::models::AdhocRunResult> {
697        let url = format!("{}/execute", self.inner.base_url);
698        self.post(
699            &url,
700            &crate::models::AdhocRunRequest {
701                source,
702                inputs,
703                breakpoint_lines,
704                channel,
705                triggered_by,
706            },
707        )
708        .await
709    }
710
711    /// Subscribe to engine events from ad-hoc executions in the given sandbox
712    /// project. Pass the `project_id` returned from [`run_adhoc`](Self::run_adhoc)
713    /// or [`get_sandbox_project_id`](Self::get_sandbox_project_id).
714    ///
715    /// Returns a receiver for [`EngineEvent`]s (filtered to `Execution` variants)
716    /// plus a subscription handle — dropping the handle cancels the stream.
717    ///
718    /// Equivalent to [`adhoc_event_stream_with_ready`](Self::adhoc_event_stream_with_ready)
719    /// with `ready = None`. Use the `_with_ready` variant when you need to
720    /// avoid the subscribe-after-POST race on a fast workflow.
721    pub async fn adhoc_event_stream(
722        &self,
723        project_id: i64,
724    ) -> crate::error::Result<(
725        tokio::sync::mpsc::UnboundedReceiver<crate::models::EngineEvent>,
726        crate::sub::events::EventSubscription,
727    )> {
728        self.adhoc_event_stream_with_ready(project_id, None).await
729    }
730
731    /// Like [`adhoc_event_stream`](Self::adhoc_event_stream), but takes an
732    /// optional [`tokio::sync::Notify`] that fires once the SSE `GET /events`
733    /// response returns a 2xx status — i.e. the moment the server-side
734    /// broadcast subscriber is attached and no events emitted from this point
735    /// on can be missed.
736    ///
737    /// Use this to avoid the subscribe-after-POST race: a fast workflow
738    /// (single-digit milliseconds, mock providers) can emit `NodeStart`,
739    /// `TaskStart`, … before a naive `run_adhoc().then(adhoc_event_stream)`
740    /// has its SSE subscriber registered, and those opening events are then
741    /// dropped by the broadcast channel.
742    ///
743    /// The pattern is **subscribe → await ready → POST**:
744    ///
745    /// ```no_run
746    /// # use akribes_sdk::AkribesClient;
747    /// # use std::sync::Arc;
748    /// # use tokio::sync::Notify;
749    /// # async fn ex(client: AkribesClient, source: &str) -> akribes_sdk::Result<()> {
750    /// let project_id = client.get_sandbox_project_id().await?;
751    /// let ready = Arc::new(Notify::new());
752    /// let (mut rx, _sub) = client
753    ///     .adhoc_event_stream_with_ready(project_id, Some(Arc::clone(&ready)))
754    ///     .await?;
755    /// ready.notified().await;            // SSE attached, safe to POST
756    /// client.run_adhoc(source, None, None).await?;
757    /// while let Some(_event) = rx.recv().await { /* … */ }
758    /// # Ok(()) }
759    /// ```
760    ///
761    /// If the SSE subscription fails (auth error, server down, all retries
762    /// exhausted) the notify is **never fired** — wrap the wait in a timeout
763    /// or join it with the subscription handle to avoid blocking forever.
764    pub async fn adhoc_event_stream_with_ready(
765        &self,
766        project_id: i64,
767        ready: Option<Arc<tokio::sync::Notify>>,
768    ) -> crate::error::Result<(
769        tokio::sync::mpsc::UnboundedReceiver<crate::models::EngineEvent>,
770        crate::sub::events::EventSubscription,
771    )> {
772        use crate::models::HubEvent;
773        use tokio::sync::oneshot;
774        let (hub_tx, mut hub_rx) = tokio::sync::mpsc::unbounded_channel();
775        let (engine_tx, engine_rx) = tokio::sync::mpsc::unbounded_channel();
776
777        // Bridge the underlying oneshot ready-signal to the caller's Notify.
778        // `stream_sse_with_retry` fires the oneshot exactly once with `Ok(())`
779        // on first 2xx, or `Err(_)` if all retries are exhausted; we only
780        // notify the caller on the success path so a `notified().await` paired
781        // with a timeout still surfaces the failure.
782        let (ready_tx, ready_rx) = oneshot::channel::<crate::error::Result<()>>();
783        if let Some(notify) = ready.clone() {
784            tokio::spawn(async move {
785                if let Ok(Ok(())) = ready_rx.await {
786                    notify.notify_one();
787                }
788            });
789        } else {
790            tokio::spawn(async move {
791                let _ = ready_rx.await;
792            });
793        }
794
795        let http = self.inner.http.clone();
796        let token = self.inner.token.clone();
797        let base_url = self.inner.base_url.clone();
798        let sse_handle = tokio::spawn(async move {
799            let _ = crate::sub::events::stream_sse_with_retry(
800                http,
801                token,
802                base_url,
803                project_id,
804                Some("adhoc".to_string()),
805                hub_tx,
806                Some(ready_tx),
807            )
808            .await;
809        });
810
811        let filter_handle = tokio::spawn(async move {
812            while let Some(evt) = hub_rx.recv().await {
813                if let HubEvent::Execution { event, .. } = evt {
814                    if engine_tx.send(event).is_err() {
815                        break;
816                    }
817                }
818            }
819            sse_handle.abort();
820        });
821
822        Ok((
823            engine_rx,
824            crate::sub::events::EventSubscription::from_handle(filter_handle),
825        ))
826    }
827}
828
829// ── Project scope ────────────────────────────────────────────────────────────
830
831/// Project-scoped handle to the server. Obtained from
832/// [`AkribesClient::project`]. Cheap to clone (just an `Arc` and an `i64`).
833#[derive(Clone, Debug)]
834pub struct ProjectScope {
835    pub(crate) inner: Arc<Inner>,
836    pub(crate) project_id: i64,
837}
838
839impl ProjectScope {
840    /// The project ID this scope is bound to.
841    pub fn project_id(&self) -> i64 {
842        self.project_id
843    }
844
845    /// Underlying client (e.g. for global operations alongside scoped ones).
846    pub fn client(&self) -> AkribesClient {
847        AkribesClient {
848            inner: Arc::clone(&self.inner),
849        }
850    }
851
852    /// Script management within this project.
853    pub fn scripts(&self) -> crate::sub::scripts::ScriptsClient {
854        crate::sub::scripts::ScriptsClient::new(Arc::clone(&self.inner), self.project_id)
855    }
856
857    /// Script drafts within this project.
858    pub fn drafts(&self) -> crate::sub::drafts::DraftsClient {
859        crate::sub::drafts::DraftsClient::new(Arc::clone(&self.inner), self.project_id)
860    }
861
862    /// Script versions within this project.
863    pub fn versions(&self) -> crate::sub::versions::VersionsClient {
864        crate::sub::versions::VersionsClient::new(Arc::clone(&self.inner), self.project_id)
865    }
866
867    /// Script channels within this project.
868    pub fn channels(&self) -> crate::sub::channels::ChannelsClient {
869        crate::sub::channels::ChannelsClient::new(Arc::clone(&self.inner), self.project_id)
870    }
871
872    /// Project-scoped bench operations: config CRUD, cases, list/trigger runs
873    /// for a given script. Run-scoped operations (anything keyed on
874    /// `bench_runs.id`) live on [`AkribesClient::bench_runs`] instead.
875    pub fn bench(&self) -> crate::sub::bench::BenchClient {
876        crate::sub::bench::BenchClient::new(Arc::clone(&self.inner), self.project_id)
877    }
878
879    /// SSE event streams scoped to this project.
880    pub fn events(&self) -> crate::sub::events::EventsClient {
881        crate::sub::events::EventsClient::new(Arc::clone(&self.inner), self.project_id)
882    }
883
884    /// Client registration scoped to this project.
885    pub fn registered_clients(&self) -> crate::sub::clients::RegisteredClientsClient {
886        crate::sub::clients::RegisteredClientsClient::new(Arc::clone(&self.inner), self.project_id)
887    }
888
889    /// Project-scoped execution operations (run, list, cancel_run, etc.).
890    pub fn executions(&self) -> crate::sub::executions::ScopedExecutionsClient {
891        crate::sub::executions::ScopedExecutionsClient::new(
892            Arc::clone(&self.inner),
893            self.project_id,
894        )
895    }
896
897    /// MCP server/tool discovery for this project.
898    pub fn mcp(&self) -> crate::sub::mcp::McpClient {
899        crate::sub::mcp::McpClient::new(Arc::clone(&self.inner), self.project_id)
900    }
901
902    /// Document ingest (claim + upload) for this project.
903    pub fn documents(&self) -> crate::sub::documents::DocumentsClient {
904        crate::sub::documents::DocumentsClient::new(Arc::clone(&self.inner), self.project_id)
905    }
906
907    /// Project-scoped document conversion. Uploads go to
908    /// `POST /projects/{id}/convert` so the server can enforce project access.
909    pub async fn convert_file(
910        &self,
911        filename: &str,
912        data: Vec<u8>,
913    ) -> Result<crate::models::ConvertResult> {
914        crate::sub::convert::ConvertClient::new(Arc::clone(&self.inner))
915            .convert_file_for_project(self.project_id, filename, data)
916            .await
917    }
918}
919
920impl Drop for AkribesClient {
921    fn drop(&mut self) {
922        // Signal the heartbeat task to stop. The AtomicBool avoids the
923        // TOCTOU race that `Arc::strong_count` had: a clone could appear
924        // between the count check and the abort.
925        if Arc::strong_count(&self.inner) == 1 {
926            self.inner.shutdown.store(true, Ordering::Release);
927            if let Ok(mut h) = self.inner.heartbeat_handle.lock() {
928                if let Some(handle) = h.take() {
929                    handle.abort();
930                }
931            }
932        }
933    }
934}
935
936// ── Builder ─────────────────────────────────────────────────────────────────
937
938#[must_use = "a builder does nothing until .build() is called"]
939pub struct AkribesClientBuilder {
940    base_url: String,
941    project_id: Option<i64>,
942    name: Option<String>,
943    id: Option<String>,
944    token: Option<String>,
945    on_behalf_of: Option<String>,
946    http_client: Option<reqwest::Client>,
947    ingest_poll_timeout: Option<Duration>,
948}
949
950impl AkribesClientBuilder {
951    /// Set the project ID. Required for project-scoped operations (scripts,
952    /// executions, etc.). Omit for global-only usage (list_projects, etc.).
953    pub fn project_id(mut self, project_id: i64) -> Self {
954        self.project_id = Some(project_id);
955        self
956    }
957
958    /// Client display name (default: `"rust-sdk"`).
959    pub fn name(mut self, name: impl Into<String>) -> Self {
960        self.name = Some(name.into());
961        self
962    }
963
964    /// Client ID (default: random UUID v4).
965    pub fn id(mut self, id: impl Into<String>) -> Self {
966        self.id = Some(id.into());
967        self
968    }
969
970    /// Initial authentication token. Either:
971    /// - a **service token** (the secret part of `AKRIBES_SERVICE_TOKEN_<NAME>=<scope>:<secret>`), or
972    /// - a **scoped token** of the form `akribes_tk_<...>` (legacy
973    ///   `aura_tk_<...>` still accepted) minted via
974    ///   [`crate::sub::tokens::TokensClient::mint`].
975    pub fn token(mut self, token: impl Into<String>) -> Self {
976        self.token = Some(token.into());
977        self
978    }
979
980    /// Set the `X-Akribes-User` header sent on every outbound request.
981    ///
982    /// Used by the server for metrics attribution when a backend (typically
983    /// holding a service token) acts on behalf of an end user. **Advisory
984    /// only — does not grant any permission.** Authorization remains based
985    /// on the bearer token's scope.
986    ///
987    /// Mirrors the TS `AkribesClientOptions.onBehalfOf` and Python
988    /// `AkribesClient(on_behalf_of=...)` knobs. Use
989    /// [`AkribesClient::set_on_behalf_of`] to update the value at runtime.
990    ///
991    /// ```no_run
992    /// # use akribes_sdk::AkribesClient;
993    /// let client = AkribesClient::builder("http://localhost:3001")
994    ///     .project_id(2)
995    ///     .token(std::env::var("AKRIBES_SERVICE_TOKEN").unwrap())
996    ///     .on_behalf_of("alice@acme.com")
997    ///     .build();
998    /// ```
999    pub fn on_behalf_of(mut self, email: impl Into<String>) -> Self {
1000        self.on_behalf_of = Some(email.into());
1001        self
1002    }
1003
1004    /// Use a pre-configured [`reqwest::Client`] so multiple `AkribesClient`s
1005    /// can share a connection pool, proxy settings, or TLS configuration.
1006    ///
1007    /// If not called, a default `reqwest::Client` is created with a 60s
1008    /// request timeout and a 10s connect timeout.
1009    pub fn http_client(mut self, client: reqwest::Client) -> Self {
1010        self.http_client = Some(client);
1011        self
1012    }
1013
1014    /// Override the deadline `documents().ingest()` waits for a still-converting
1015    /// blob before surfacing [`AkribesError::Transient`].
1016    ///
1017    /// Resolution order at [`build`](Self::build) time:
1018    /// 1. This builder param if set.
1019    /// 2. `AKRIBES_SDK_INGEST_TIMEOUT_SECS` env var (parsed as `u64` seconds;
1020    ///    `0` and unparseable values are ignored).
1021    /// 3. [`DEFAULT_INGEST_POLL_TIMEOUT_SECS`] (300 s).
1022    ///
1023    /// Setting this to a very short duration is occasionally useful in tests
1024    /// that want to assert the timeout path. Setting it absurdly long (hours)
1025    /// just shifts the failure mode — the server has its own conversion
1026    /// timeouts.
1027    pub fn ingest_poll_timeout(mut self, timeout: Duration) -> Self {
1028        self.ingest_poll_timeout = Some(timeout);
1029        self
1030    }
1031
1032    /// Build the client.
1033    pub fn build(self) -> AkribesClient {
1034        let ingest_poll_timeout = self
1035            .ingest_poll_timeout
1036            .or_else(ingest_poll_timeout_from_env)
1037            .unwrap_or(Duration::from_secs(DEFAULT_INGEST_POLL_TIMEOUT_SECS));
1038        AkribesClient {
1039            inner: Arc::new(Inner {
1040                base_url: self.base_url,
1041                project_id: self.project_id,
1042                name: self.name.unwrap_or_else(|| "rust-sdk".to_string()),
1043                id: self.id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
1044                http: self.http_client.unwrap_or_else(default_http_client),
1045                token: Arc::new(RwLock::new(self.token)),
1046                on_behalf_of: Arc::new(RwLock::new(self.on_behalf_of)),
1047                heartbeat_handle: Mutex::new(None),
1048                shutdown: Arc::new(AtomicBool::new(false)),
1049                schema_cache: Mutex::new(HashMap::new()),
1050                broken_scripts: Mutex::new(HashSet::new()),
1051                ingest_poll_timeout,
1052            }),
1053        }
1054    }
1055}