Skip to main content

studio_worker/
http.rs

1//! Thin reqwest wrapper around the studio API.
2//!
3//! Every call goes through [`ApiClient::check`], which:
4//!
5//! - emits a structured `tracing` event on success (`debug`) and
6//!   failure (`warn`) so operators can see what the worker is talking
7//!   to without having to enable wire-level logging in reqwest
8//!   (`complete` also logs the upload byte size before the request so
9//!   the attempted payload size is visible even when it never finishes),
10//!   and
11//! - turns non-2xx responses into an `anyhow` error tagged with the
12//!   operation name so the existing log shipper messages stay legible.
13use crate::types::*;
14use anyhow::{anyhow, Context, Result};
15use reqwest::blocking::{Client, Response};
16use std::time::{Duration, Instant};
17use tracing::{debug, warn};
18
19/// Base path under which the worker endpoints are mounted.
20const API_PREFIX: &str = "/graphics/api";
21
22/// Tracing target used for every event emitted by the HTTP client.
23/// Keeping it stable lets operators filter with
24/// `RUST_LOG=studio_worker::http=debug` without touching the rest of
25/// the agent's logs.
26const TRACE_TARGET: &str = "studio_worker::http";
27
28pub struct ApiClient {
29    pub base_url: String,
30    pub client: Client,
31}
32
33impl ApiClient {
34    pub fn new(base_url: String) -> Result<Self> {
35        let client = Client::builder()
36            .timeout(Duration::from_secs(60))
37            .build()
38            .context("building reqwest client")?;
39        Ok(Self {
40            base_url: normalize_base_url(&base_url)?,
41            client,
42        })
43    }
44
45    fn url(&self, path: &str) -> String {
46        format!("{}{}{}", self.base_url, API_PREFIX, path)
47    }
48
49    /// Inspect a response, log it, and convert non-2xx into an
50    /// `anyhow` error.  `op` is the human-readable operation name used
51    /// in the error message (kept stable for log-shipper consumers and
52    /// existing tests).
53    fn check(&self, op: &str, url: &str, started: Instant, response: Response) -> Result<Response> {
54        let status = response.status();
55        let elapsed_ms = started.elapsed().as_millis() as u64;
56        if status.is_success() || status.as_u16() == 204 {
57            debug!(
58                target: TRACE_TARGET,
59                op,
60                endpoint = %url,
61                status = status.as_u16(),
62                elapsed_ms,
63                "ok"
64            );
65            return Ok(response);
66        }
67        // Body read consumes the response; we only need it on the
68        // failure path.
69        let body = response.text().unwrap_or_default();
70        warn!(
71            target: TRACE_TARGET,
72            op,
73            endpoint = %url,
74            status = status.as_u16(),
75            elapsed_ms,
76            body = %body,
77            "{op} failed"
78        );
79        Err(anyhow!("{op} failed: {status} — {body}"))
80    }
81
82    // -----------------------------------------------------------------------
83    // Auto-register (operator-approved) flow
84    // -----------------------------------------------------------------------
85
86    /// Create a Pending Workers row.  Unauthenticated on purpose —
87    /// the studio rate-limits this endpoint by source IP and the
88    /// operator manually approves before the worker can do anything.
89    pub fn register_request(
90        &self,
91        payload: &AutoRegisterRequest,
92    ) -> Result<AutoRegisterRequestResponse> {
93        let url = self.url("/workers/register-request");
94        let started = Instant::now();
95        let response = self.client.post(&url).json(payload).send()?;
96        let response = self.check("register-request", &url, started, response)?;
97        Ok(response.json()?)
98    }
99
100    /// Poll the studio for the operator's decision on a previously
101    /// submitted register-request.  Returns `Ok(None)` when the
102    /// request id is unknown to the studio (likely cleaned up or
103    /// never existed) so the orchestrator can drop the stale id and
104    /// start a fresh one.  Auth is the raw `registration_secret`
105    /// presented as a Bearer token.
106    pub fn poll_register_status(
107        &self,
108        request_id: &str,
109        registration_secret: &str,
110    ) -> Result<Option<RegisterStatus>> {
111        let url = self.url(&format!("/workers/register-requests/{request_id}"));
112        let started = Instant::now();
113        let response = self
114            .client
115            .get(&url)
116            .bearer_auth(registration_secret)
117            .send()?;
118        if response.status().as_u16() == 404 {
119            debug!(
120                target: TRACE_TARGET,
121                op = "register-poll",
122                endpoint = %url,
123                status = 404,
124                elapsed_ms = started.elapsed().as_millis() as u64,
125                "register request not found (stale id; orchestrator will recreate)"
126            );
127            return Ok(None);
128        }
129        let response = self.check("register-poll", &url, started, response)?;
130        Ok(Some(response.json()?))
131    }
132
133    /// Complete a job with binary output (image / audio / video).
134    ///
135    /// This is the only worker-side HTTP route that survives the WS
136    /// migration: R2 multipart doesn't fit cleanly into WS frames.
137    /// Heartbeats, claim/accept/reject, completeJson, fail, and log
138    /// shipping all flow over the WS session owned by
139    /// `ws::session::spawn_ws_session`.
140    pub fn complete(
141        &self,
142        worker_id: &str,
143        token: &str,
144        job_id: &str,
145        ext: &str,
146        prompt: &str,
147        image: Vec<u8>,
148    ) -> Result<()> {
149        let mime = mime_for_ext(ext);
150        let bytes = image.len() as u64;
151        // Emitted before the (potentially slow or failing) upload so the
152        // attempted payload size is always in the operator's logs, even
153        // when the request itself never completes.
154        debug!(
155            target: TRACE_TARGET,
156            op = "complete",
157            job_id,
158            ext,
159            mime,
160            bytes,
161            "uploading job result"
162        );
163        let part = reqwest::blocking::multipart::Part::bytes(image)
164            .file_name(format!("{job_id}.{ext}"))
165            .mime_str(mime)?;
166        let form = reqwest::blocking::multipart::Form::new()
167            .text("prompt", prompt.to_string())
168            .text("ext", ext.to_string())
169            .part("image", part);
170        let url = self.url(&format!("/workers/{worker_id}/jobs/{job_id}/complete"));
171        let started = Instant::now();
172        let response = self
173            .client
174            .post(&url)
175            .bearer_auth(token)
176            .multipart(form)
177            .send()?;
178        self.check("complete", &url, started, response)?;
179        Ok(())
180    }
181}
182
183fn normalize_base_url(base_url: &str) -> Result<String> {
184    let mut url =
185        url::Url::parse(base_url).map_err(|e| anyhow!("invalid api_base_url {base_url:?}: {e}"))?;
186    url.set_query(None);
187    url.set_fragment(None);
188
189    let trimmed_path = url.path().trim_end_matches('/').to_string();
190    if trimmed_path.ends_with(API_PREFIX) {
191        let without_prefix = trimmed_path[..trimmed_path.len() - API_PREFIX.len()].to_string();
192        url.set_path(if without_prefix.is_empty() {
193            "/"
194        } else {
195            &without_prefix
196        });
197    }
198
199    Ok(url.as_str().trim_end_matches('/').to_string())
200}
201
202/// Map a binary output's file extension to the MIME type sent as the
203/// multipart `complete` upload's `Content-Type`.  Single source of
204/// truth: every engine that emits a `TaskResult` binary extension
205/// (synthetic image → `png`/`webp`, sd-cpp → `webp`, tts → `wav`,
206/// synthetic video → `webp`, the `video` feature → `gif`) routes
207/// through here, so a new extension can't silently drift into
208/// `application/octet-stream` and break the studio's stored
209/// content-type.
210pub fn mime_for_ext(ext: &str) -> &'static str {
211    match ext {
212        "png" => "image/png",
213        "webp" => "image/webp",
214        "gif" => "image/gif",
215        "wav" => "audio/wav",
216        "mp3" => "audio/mpeg",
217        "mp4" => "video/mp4",
218        _ => "application/octet-stream",
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225
226    #[test]
227    fn mime_for_ext_maps_known_image_audio_video_types() {
228        assert_eq!(mime_for_ext("png"), "image/png");
229        assert_eq!(mime_for_ext("webp"), "image/webp");
230        assert_eq!(mime_for_ext("gif"), "image/gif");
231        assert_eq!(mime_for_ext("wav"), "audio/wav");
232        assert_eq!(mime_for_ext("mp3"), "audio/mpeg");
233        assert_eq!(mime_for_ext("mp4"), "video/mp4");
234    }
235
236    #[test]
237    fn mime_for_ext_falls_back_to_octet_stream_for_unknown() {
238        assert_eq!(mime_for_ext("bin"), "application/octet-stream");
239        assert_eq!(mime_for_ext(""), "application/octet-stream");
240    }
241
242    #[test]
243    fn normalize_base_url_strips_existing_graphics_api_prefix() {
244        let api = ApiClient::new("https://studio.example/graphics/api/".into()).unwrap();
245        assert_eq!(
246            api.url("/workers/register-request"),
247            "https://studio.example/graphics/api/workers/register-request"
248        );
249    }
250
251    #[test]
252    fn normalize_base_url_preserves_outer_mount_path() {
253        let api = ApiClient::new("https://studio.example/custom/graphics/api".into()).unwrap();
254        assert_eq!(
255            api.url("/workers/register-request"),
256            "https://studio.example/custom/graphics/api/workers/register-request"
257        );
258    }
259
260    #[test]
261    fn mime_for_ext_covers_every_extension_engines_emit() {
262        // Lock the contract: each binary extension an engine actually
263        // emits must resolve to a real MIME type, never the
264        // octet-stream fallback.  `gif` is the one the `video`
265        // feature produces and that regressed before this guard.
266        for ext in ["png", "webp", "gif", "wav"] {
267            assert_ne!(
268                mime_for_ext(ext),
269                "application/octet-stream",
270                "engine output extension {ext:?} must map to a real MIME type"
271            );
272        }
273    }
274}