Skip to main content

studio_worker/
http.rs

1//! Thin reqwest wrapper around the studio API.
2//!
3//! Every call goes through [`ApiClient::check`], which:
4//!
5//! - emits a structured `tracing` event on success (`debug`) and
6//!   failure (`warn`) so operators can see what the worker is talking
7//!   to without having to enable wire-level logging in reqwest, and
8//! - turns non-2xx responses into an `anyhow` error tagged with the
9//!   operation name so the existing log shipper messages stay legible.
10use crate::types::*;
11use anyhow::{anyhow, Context, Result};
12use reqwest::blocking::{Client, Response};
13use std::time::{Duration, Instant};
14use tracing::{debug, warn};
15
16/// Base path under which the worker endpoints are mounted.
17const API_PREFIX: &str = "/graphics/api";
18
19/// Tracing target used for every event emitted by the HTTP client.
20/// Keeping it stable lets operators filter with
21/// `RUST_LOG=studio_worker::http=debug` without touching the rest of
22/// the agent's logs.
23const TRACE_TARGET: &str = "studio_worker::http";
24
25pub struct ApiClient {
26    pub base_url: String,
27    pub client: Client,
28}
29
30impl ApiClient {
31    pub fn new(base_url: String) -> Result<Self> {
32        let client = Client::builder()
33            .timeout(Duration::from_secs(60))
34            .build()
35            .context("building reqwest client")?;
36        Ok(Self {
37            base_url: base_url.trim_end_matches('/').to_string(),
38            client,
39        })
40    }
41
42    fn url(&self, path: &str) -> String {
43        format!("{}{}{}", self.base_url, API_PREFIX, path)
44    }
45
46    /// Inspect a response, log it, and convert non-2xx into an
47    /// `anyhow` error.  `op` is the human-readable operation name used
48    /// in the error message (kept stable for log-shipper consumers and
49    /// existing tests).
50    fn check(&self, op: &str, url: &str, started: Instant, response: Response) -> Result<Response> {
51        let status = response.status();
52        let elapsed_ms = started.elapsed().as_millis() as u64;
53        if status.is_success() || status.as_u16() == 204 {
54            debug!(
55                target: TRACE_TARGET,
56                op,
57                endpoint = %url,
58                status = status.as_u16(),
59                elapsed_ms,
60                "ok"
61            );
62            return Ok(response);
63        }
64        // Body read consumes the response; we only need it on the
65        // failure path.
66        let body = response.text().unwrap_or_default();
67        warn!(
68            target: TRACE_TARGET,
69            op,
70            endpoint = %url,
71            status = status.as_u16(),
72            elapsed_ms,
73            body = %body,
74            "{op} failed"
75        );
76        Err(anyhow!("{op} failed: {status} — {body}"))
77    }
78
79    // -----------------------------------------------------------------------
80    // Auto-register (operator-approved) flow
81    // -----------------------------------------------------------------------
82
83    /// Create a Pending Workers row.  Unauthenticated on purpose —
84    /// the studio rate-limits this endpoint by source IP and the
85    /// operator manually approves before the worker can do anything.
86    pub fn register_request(
87        &self,
88        payload: &AutoRegisterRequest,
89    ) -> Result<AutoRegisterRequestResponse> {
90        let url = self.url("/workers/register-request");
91        let started = Instant::now();
92        let response = self.client.post(&url).json(payload).send()?;
93        let response = self.check("register-request", &url, started, response)?;
94        Ok(response.json()?)
95    }
96
97    /// Poll the studio for the operator's decision on a previously
98    /// submitted register-request.  Returns `Ok(None)` when the
99    /// request id is unknown to the studio (likely cleaned up or
100    /// never existed) so the orchestrator can drop the stale id and
101    /// start a fresh one.  Auth is the raw `registration_secret`
102    /// presented as a Bearer token.
103    pub fn poll_register_status(
104        &self,
105        request_id: &str,
106        registration_secret: &str,
107    ) -> Result<Option<RegisterStatus>> {
108        let url = self.url(&format!("/workers/register-requests/{request_id}"));
109        let started = Instant::now();
110        let response = self
111            .client
112            .get(&url)
113            .bearer_auth(registration_secret)
114            .send()?;
115        if response.status().as_u16() == 404 {
116            debug!(
117                target: TRACE_TARGET,
118                op = "register-poll",
119                endpoint = %url,
120                status = 404,
121                elapsed_ms = started.elapsed().as_millis() as u64,
122                "register request not found (stale id; orchestrator will recreate)"
123            );
124            return Ok(None);
125        }
126        let response = self.check("register-poll", &url, started, response)?;
127        Ok(Some(response.json()?))
128    }
129
130    /// Complete a job with binary output (image / audio / video).
131    ///
132    /// This is the only worker-side HTTP route that survives the WS
133    /// migration: R2 multipart doesn't fit cleanly into WS frames.
134    /// Heartbeats, claim/accept/reject, completeJson, fail, and log
135    /// shipping all flow over the WS session owned by
136    /// `ws::session::spawn_ws_session`.
137    pub fn complete(
138        &self,
139        worker_id: &str,
140        token: &str,
141        job_id: &str,
142        ext: &str,
143        prompt: &str,
144        image: Vec<u8>,
145    ) -> Result<()> {
146        let mime = match ext {
147            "png" => "image/png",
148            "webp" => "image/webp",
149            "wav" => "audio/wav",
150            "mp3" => "audio/mpeg",
151            "mp4" => "video/mp4",
152            _ => "application/octet-stream",
153        };
154        let part = reqwest::blocking::multipart::Part::bytes(image)
155            .file_name(format!("{job_id}.{ext}"))
156            .mime_str(mime)?;
157        let form = reqwest::blocking::multipart::Form::new()
158            .text("prompt", prompt.to_string())
159            .text("ext", ext.to_string())
160            .part("image", part);
161        let url = self.url(&format!("/workers/{worker_id}/jobs/{job_id}/complete"));
162        let started = Instant::now();
163        let response = self
164            .client
165            .post(&url)
166            .bearer_auth(token)
167            .multipart(form)
168            .send()?;
169        self.check("complete", &url, started, response)?;
170        Ok(())
171    }
172}