Skip to main content

trace_share_core/
worker.rs

1use anyhow::{Context, Result};
2use rand::{Rng, thread_rng};
3use serde::{Deserialize, Serialize};
4use std::collections::BTreeMap;
5use tokio::time::{Duration, sleep};
6
7use crate::security::ensure_secure_url;
8use crate::{config::AppConfig, episode::EpisodeRecord};
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct WorkerUploadResponse {
12    pub episode_id: String,
13    pub object_key: String,
14    pub etag: Option<String>,
15}
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18struct PresignUploadResponse {
19    upload_url: String,
20    object_key: String,
21    headers: Option<BTreeMap<String, String>>,
22}
23
24pub async fn upload_episode(
25    config: &AppConfig,
26    episode: &EpisodeRecord,
27) -> Result<WorkerUploadResponse> {
28    let mode = config.worker.upload_mode.to_ascii_lowercase();
29    if mode == "presigned" {
30        return upload_episode_presigned(config, episode).await;
31    }
32    if mode == "auto" {
33        match upload_episode_presigned(config, episode).await {
34            Ok(v) => return Ok(v),
35            Err(e) => {
36                let text = format!("{e:#}");
37                if !(text.contains("status=404") || text.contains("status=501")) {
38                    return Err(e).context("worker upload (auto mode, presigned path)");
39                }
40            }
41        }
42    }
43    upload_episode_legacy(config, episode).await
44}
45
46async fn upload_episode_legacy(
47    config: &AppConfig,
48    episode: &EpisodeRecord,
49) -> Result<WorkerUploadResponse> {
50    let base_url = config
51        .worker
52        .base_url
53        .as_ref()
54        .context("missing TRACE_SHARE_WORKER_BASE_URL")?;
55    ensure_secure_url(base_url, "worker base URL")?;
56
57    let endpoint = format!("{}/v1/episodes", base_url.trim_end_matches('/'));
58    let client = reqwest::Client::builder()
59        .timeout(std::time::Duration::from_secs(
60            config.worker.timeout_seconds.max(5),
61        ))
62        .build()?;
63
64    let mut attempt: u32 = 0;
65    loop {
66        let mut req = client.post(&endpoint).json(episode);
67        if let Some(token) = config.worker.api_token.as_ref() {
68            req = req.bearer_auth(token);
69        }
70
71        let resp = req.send().await;
72        match resp {
73            Ok(resp) => {
74                let status = resp.status();
75                if status.is_success() {
76                    return Ok(resp.json::<WorkerUploadResponse>().await?);
77                }
78
79                let body = resp.text().await.unwrap_or_default();
80                if !should_retry_status(status) || attempt >= 4 {
81                    anyhow::bail!("worker upload failed: status={} body={}", status, body);
82                }
83            }
84            Err(e) => {
85                let retryable_transport = e.is_timeout() || e.is_connect() || e.is_request();
86                if !retryable_transport || attempt >= 4 {
87                    return Err(e).context("worker upload request failed after retries");
88                }
89            }
90        }
91
92        attempt += 1;
93        let jitter: u64 = thread_rng().gen_range(50..300);
94        let wait_ms = (2u64.pow(attempt) * 200) + jitter;
95        sleep(Duration::from_millis(wait_ms)).await;
96    }
97}
98
99async fn upload_episode_presigned(
100    config: &AppConfig,
101    episode: &EpisodeRecord,
102) -> Result<WorkerUploadResponse> {
103    let base_url = config
104        .worker
105        .base_url
106        .as_ref()
107        .context("missing TRACE_SHARE_WORKER_BASE_URL")?;
108    ensure_secure_url(base_url, "worker base URL")?;
109    let presign_endpoint = format!("{}/v1/episodes/presign", base_url.trim_end_matches('/'));
110    let complete_endpoint = format!("{}/v1/episodes/complete", base_url.trim_end_matches('/'));
111    let client = reqwest::Client::builder()
112        .timeout(std::time::Duration::from_secs(
113            config.worker.timeout_seconds.max(5),
114        ))
115        .build()?;
116
117    let presign_payload = serde_json::json!({
118        "episode_id": episode.id,
119        "content_hash": episode.content_hash,
120        "content_type": "application/json",
121    });
122
123    let presign = post_with_retry_json::<PresignUploadResponse>(
124        &client,
125        &presign_endpoint,
126        config.worker.api_token.as_deref(),
127        &presign_payload,
128        "worker episode presign",
129    )
130    .await?;
131
132    let episode_bytes = serde_json::to_vec(episode)?;
133    put_with_retry(
134        &client,
135        &presign.upload_url,
136        presign.headers.as_ref(),
137        &episode_bytes,
138        "worker episode upload",
139    )
140    .await?;
141
142    let complete_payload = serde_json::json!({
143        "episode_id": episode.id,
144        "object_key": presign.object_key,
145        "content_hash": episode.content_hash,
146    });
147
148    post_with_retry_json::<WorkerUploadResponse>(
149        &client,
150        &complete_endpoint,
151        config.worker.api_token.as_deref(),
152        &complete_payload,
153        "worker episode complete",
154    )
155    .await
156}
157
158pub async fn push_revocation(
159    config: &AppConfig,
160    episode_id: &str,
161    revoked_at: &str,
162    reason: Option<&str>,
163) -> Result<()> {
164    let base_url = config
165        .worker
166        .base_url
167        .as_ref()
168        .context("missing TRACE_SHARE_WORKER_BASE_URL")?;
169    ensure_secure_url(base_url, "worker base URL")?;
170
171    let endpoint = format!("{}/v1/revocations", base_url.trim_end_matches('/'));
172    let client = reqwest::Client::builder()
173        .timeout(std::time::Duration::from_secs(
174            config.worker.timeout_seconds.max(5),
175        ))
176        .build()?;
177
178    let payload = serde_json::json!({
179        "episode_id": episode_id,
180        "revoked_at": revoked_at,
181        "reason": reason,
182    });
183
184    let mut attempt: u32 = 0;
185    loop {
186        let mut req = client.post(&endpoint).json(&payload);
187        if let Some(token) = config.worker.api_token.as_ref() {
188            req = req.bearer_auth(token);
189        }
190
191        let resp = req.send().await;
192        match resp {
193            Ok(resp) => {
194                let status = resp.status();
195                if status.is_success() {
196                    return Ok(());
197                }
198
199                let body = resp.text().await.unwrap_or_default();
200                if !should_retry_status(status) || attempt >= 4 {
201                    anyhow::bail!(
202                        "worker revocation push failed: status={} body={}",
203                        status,
204                        body
205                    );
206                }
207            }
208            Err(e) => {
209                let retryable_transport = e.is_timeout() || e.is_connect() || e.is_request();
210                if !retryable_transport || attempt >= 4 {
211                    return Err(e).context("worker revocation request failed after retries");
212                }
213            }
214        }
215
216        attempt += 1;
217        let jitter: u64 = thread_rng().gen_range(50..300);
218        let wait_ms = (2u64.pow(attempt) * 200) + jitter;
219        sleep(Duration::from_millis(wait_ms)).await;
220    }
221}
222
223async fn post_with_retry_json<T: for<'de> Deserialize<'de>>(
224    client: &reqwest::Client,
225    endpoint: &str,
226    bearer_token: Option<&str>,
227    payload: &serde_json::Value,
228    label: &str,
229) -> Result<T> {
230    let mut attempt: u32 = 0;
231    loop {
232        let mut req = client.post(endpoint).json(payload);
233        if let Some(token) = bearer_token {
234            req = req.bearer_auth(token);
235        }
236
237        let resp = req.send().await;
238        match resp {
239            Ok(resp) => {
240                let status = resp.status();
241                if status.is_success() {
242                    return Ok(resp.json::<T>().await?);
243                }
244                let body = resp.text().await.unwrap_or_default();
245                if !should_retry_status(status) || attempt >= 4 {
246                    anyhow::bail!("{label} failed: status={} body={}", status, body);
247                }
248            }
249            Err(e) => {
250                let retryable_transport = e.is_timeout() || e.is_connect() || e.is_request();
251                if !retryable_transport || attempt >= 4 {
252                    return Err(e).with_context(|| format!("{label} request failed after retries"));
253                }
254            }
255        }
256
257        attempt += 1;
258        let jitter: u64 = thread_rng().gen_range(50..300);
259        let wait_ms = (2u64.pow(attempt) * 200) + jitter;
260        sleep(Duration::from_millis(wait_ms)).await;
261    }
262}
263
264async fn put_with_retry(
265    client: &reqwest::Client,
266    endpoint: &str,
267    headers: Option<&BTreeMap<String, String>>,
268    body: &[u8],
269    label: &str,
270) -> Result<()> {
271    ensure_secure_url(endpoint, label)?;
272    let mut attempt: u32 = 0;
273    loop {
274        let mut req = client.put(endpoint).body(body.to_vec());
275        if let Some(headers) = headers {
276            for (k, v) in headers {
277                req = req.header(k, v);
278            }
279        }
280
281        let resp = req.send().await;
282        match resp {
283            Ok(resp) => {
284                let status = resp.status();
285                if status.is_success() {
286                    return Ok(());
287                }
288                let body = resp.text().await.unwrap_or_default();
289                if !should_retry_status(status) || attempt >= 4 {
290                    anyhow::bail!("{label} failed: status={} body={}", status, body);
291                }
292            }
293            Err(e) => {
294                let retryable_transport = e.is_timeout() || e.is_connect() || e.is_request();
295                if !retryable_transport || attempt >= 4 {
296                    return Err(e).with_context(|| format!("{label} request failed after retries"));
297                }
298            }
299        }
300
301        attempt += 1;
302        let jitter: u64 = thread_rng().gen_range(50..300);
303        let wait_ms = (2u64.pow(attempt) * 200) + jitter;
304        sleep(Duration::from_millis(wait_ms)).await;
305    }
306}
307
308fn should_retry_status(status: reqwest::StatusCode) -> bool {
309    status == reqwest::StatusCode::TOO_MANY_REQUESTS || status.is_server_error()
310}