Skip to main content

studio_worker/
http.rs

1//! Thin reqwest wrapper around the studio API.
2//!
3//! Every call goes through [`ApiClient::check`], which:
4//!
5//! - emits a structured `tracing` event on success (`debug`) and
6//!   failure (`warn`) so operators can see what the worker is talking
7//!   to without having to enable wire-level logging in reqwest, and
8//! - turns non-2xx responses into an `anyhow` error tagged with the
9//!   operation name so the existing log shipper messages stay legible.
10use crate::types::*;
11use anyhow::{anyhow, Context, Result};
12use reqwest::blocking::{Client, Response};
13use std::time::{Duration, Instant};
14use tracing::{debug, warn};
15
16/// Base path under which the worker endpoints are mounted.
17const API_PREFIX: &str = "/graphics/api";
18
19/// Tracing target used for every event emitted by the HTTP client.
20/// Keeping it stable lets operators filter with
21/// `RUST_LOG=studio_worker::http=debug` without touching the rest of
22/// the agent's logs.
23const TRACE_TARGET: &str = "studio_worker::http";
24
25pub struct ApiClient {
26    pub base_url: String,
27    pub client: Client,
28}
29
30impl ApiClient {
31    pub fn new(base_url: String) -> Result<Self> {
32        let client = Client::builder()
33            .timeout(Duration::from_secs(60))
34            .build()
35            .context("building reqwest client")?;
36        Ok(Self {
37            base_url: base_url.trim_end_matches('/').to_string(),
38            client,
39        })
40    }
41
42    fn url(&self, path: &str) -> String {
43        format!("{}{}{}", self.base_url, API_PREFIX, path)
44    }
45
46    /// Inspect a response, log it, and convert non-2xx into an
47    /// `anyhow` error.  `op` is the human-readable operation name used
48    /// in the error message (kept stable for log-shipper consumers and
49    /// existing tests).
50    fn check(&self, op: &str, url: &str, started: Instant, response: Response) -> Result<Response> {
51        let status = response.status();
52        let elapsed_ms = started.elapsed().as_millis() as u64;
53        if status.is_success() || status.as_u16() == 204 {
54            debug!(
55                target: TRACE_TARGET,
56                op,
57                endpoint = %url,
58                status = status.as_u16(),
59                elapsed_ms,
60                "ok"
61            );
62            return Ok(response);
63        }
64        // Body read consumes the response; we only need it on the
65        // failure path.
66        let body = response.text().unwrap_or_default();
67        warn!(
68            target: TRACE_TARGET,
69            op,
70            endpoint = %url,
71            status = status.as_u16(),
72            elapsed_ms,
73            body = %body,
74            "{op} failed"
75        );
76        Err(anyhow!("{op} failed: {status} — {body}"))
77    }
78
79    pub fn register(
80        &self,
81        bootstrap_token: &str,
82        cap: WorkerCapabilities,
83        worker_id: Option<String>,
84    ) -> Result<RegisterResponse> {
85        let body = RegisterRequest {
86            bootstrap_token: bootstrap_token.to_string(),
87            capabilities: cap,
88            worker_id,
89        };
90        let url = self.url("/workers/register");
91        let started = Instant::now();
92        let response = self
93            .client
94            .post(&url)
95            .bearer_auth(bootstrap_token)
96            .json(&body)
97            .send()?;
98        let response = self.check("register", &url, started, response)?;
99        Ok(response.json()?)
100    }
101
102    pub fn heartbeat(
103        &self,
104        worker_id: &str,
105        token: &str,
106        cap: WorkerCapabilities,
107        current_job_id: Option<String>,
108    ) -> Result<()> {
109        let body = HeartbeatRequest {
110            capabilities: cap,
111            current_job_id,
112        };
113        let url = self.url(&format!("/workers/{worker_id}/heartbeat"));
114        let started = Instant::now();
115        let response = self
116            .client
117            .post(&url)
118            .bearer_auth(token)
119            .json(&body)
120            .send()?;
121        self.check("heartbeat", &url, started, response)?;
122        Ok(())
123    }
124
125    /// Returns `Ok(None)` on HTTP 204 (no jobs).
126    pub fn claim(&self, worker_id: &str, token: &str) -> Result<Option<JobClaim>> {
127        let url = self.url(&format!("/workers/{worker_id}/claim"));
128        let started = Instant::now();
129        let response = self.client.post(&url).bearer_auth(token).send()?;
130        let response = self.check("claim", &url, started, response)?;
131        if response.status().as_u16() == 204 {
132            return Ok(None);
133        }
134        Ok(Some(response.json()?))
135    }
136
137    /// Complete a job with binary output (image / audio / video).
138    pub fn complete(
139        &self,
140        worker_id: &str,
141        token: &str,
142        job_id: &str,
143        ext: &str,
144        prompt: &str,
145        image: Vec<u8>,
146    ) -> Result<()> {
147        let mime = match ext {
148            "png" => "image/png",
149            "webp" => "image/webp",
150            "wav" => "audio/wav",
151            "mp3" => "audio/mpeg",
152            "mp4" => "video/mp4",
153            _ => "application/octet-stream",
154        };
155        let part = reqwest::blocking::multipart::Part::bytes(image)
156            .file_name(format!("{job_id}.{ext}"))
157            .mime_str(mime)?;
158        let form = reqwest::blocking::multipart::Form::new()
159            .text("prompt", prompt.to_string())
160            .text("ext", ext.to_string())
161            .part("image", part);
162        let url = self.url(&format!("/workers/{worker_id}/jobs/{job_id}/complete"));
163        let started = Instant::now();
164        let response = self
165            .client
166            .post(&url)
167            .bearer_auth(token)
168            .multipart(form)
169            .send()?;
170        self.check("complete", &url, started, response)?;
171        Ok(())
172    }
173
174    /// Complete a job with structured JSON output (LLM / STT).
175    pub fn complete_json(
176        &self,
177        worker_id: &str,
178        token: &str,
179        job_id: &str,
180        prompt: &str,
181        result: &serde_json::Value,
182    ) -> Result<()> {
183        let body = serde_json::json!({
184            "jobId": job_id,
185            "prompt": prompt,
186            "result": result,
187            "resultKind": "json",
188        });
189        let url = self.url(&format!("/workers/{worker_id}/jobs/{job_id}/complete-json"));
190        let started = Instant::now();
191        let response = self
192            .client
193            .post(&url)
194            .bearer_auth(token)
195            .json(&body)
196            .send()?;
197        self.check("complete-json", &url, started, response)?;
198        Ok(())
199    }
200
201    pub fn fail(
202        &self,
203        worker_id: &str,
204        token: &str,
205        job_id: &str,
206        error: &str,
207        retryable: bool,
208    ) -> Result<()> {
209        let body = FailRequest {
210            error: error.to_string(),
211            retryable,
212        };
213        let url = self.url(&format!("/workers/{worker_id}/jobs/{job_id}/fail"));
214        let started = Instant::now();
215        let response = self
216            .client
217            .post(&url)
218            .bearer_auth(token)
219            .json(&body)
220            .send()?;
221        self.check("fail", &url, started, response)?;
222        Ok(())
223    }
224
225    pub fn ship_logs(&self, worker_id: &str, token: &str, batch: LogBatch) -> Result<()> {
226        let url = self.url(&format!("/workers/{worker_id}/logs"));
227        let started = Instant::now();
228        let response = self
229            .client
230            .post(&url)
231            .bearer_auth(token)
232            .json(&batch)
233            .send()?;
234        self.check("log ship", &url, started, response)?;
235        Ok(())
236    }
237}