1use anyhow::{Context, Result};
2use rand::{Rng, thread_rng};
3use serde::{Deserialize, Serialize};
4use std::collections::BTreeMap;
5use tokio::time::{Duration, sleep};
6
7use crate::security::ensure_secure_url;
8use crate::{config::AppConfig, episode::EpisodeRecord};
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct WorkerUploadResponse {
12 pub episode_id: String,
13 pub object_key: String,
14 pub etag: Option<String>,
15}
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18struct PresignUploadResponse {
19 upload_url: String,
20 object_key: String,
21 headers: Option<BTreeMap<String, String>>,
22}
23
24pub async fn upload_episode(
25 config: &AppConfig,
26 episode: &EpisodeRecord,
27) -> Result<WorkerUploadResponse> {
28 let mode = config.worker.upload_mode.to_ascii_lowercase();
29 if mode == "presigned" {
30 return upload_episode_presigned(config, episode).await;
31 }
32 if mode == "auto" {
33 match upload_episode_presigned(config, episode).await {
34 Ok(v) => return Ok(v),
35 Err(e) => {
36 let text = format!("{e:#}");
37 if !(text.contains("status=404") || text.contains("status=501")) {
38 return Err(e).context("worker upload (auto mode, presigned path)");
39 }
40 }
41 }
42 }
43 upload_episode_legacy(config, episode).await
44}
45
46async fn upload_episode_legacy(
47 config: &AppConfig,
48 episode: &EpisodeRecord,
49) -> Result<WorkerUploadResponse> {
50 let base_url = config
51 .worker
52 .base_url
53 .as_ref()
54 .context("missing TRACE_SHARE_WORKER_BASE_URL")?;
55 ensure_secure_url(base_url, "worker base URL")?;
56
57 let endpoint = format!("{}/v1/episodes", base_url.trim_end_matches('/'));
58 let client = reqwest::Client::builder()
59 .timeout(std::time::Duration::from_secs(
60 config.worker.timeout_seconds.max(5),
61 ))
62 .build()?;
63
64 let mut attempt: u32 = 0;
65 loop {
66 let mut req = client.post(&endpoint).json(episode);
67 if let Some(token) = config.worker.api_token.as_ref() {
68 req = req.bearer_auth(token);
69 }
70
71 let resp = req.send().await;
72 match resp {
73 Ok(resp) => {
74 let status = resp.status();
75 if status.is_success() {
76 return Ok(resp.json::<WorkerUploadResponse>().await?);
77 }
78
79 let body = resp.text().await.unwrap_or_default();
80 if !should_retry_status(status) || attempt >= 4 {
81 anyhow::bail!("worker upload failed: status={} body={}", status, body);
82 }
83 }
84 Err(e) => {
85 let retryable_transport = e.is_timeout() || e.is_connect() || e.is_request();
86 if !retryable_transport || attempt >= 4 {
87 return Err(e).context("worker upload request failed after retries");
88 }
89 }
90 }
91
92 attempt += 1;
93 let jitter: u64 = thread_rng().gen_range(50..300);
94 let wait_ms = (2u64.pow(attempt) * 200) + jitter;
95 sleep(Duration::from_millis(wait_ms)).await;
96 }
97}
98
99async fn upload_episode_presigned(
100 config: &AppConfig,
101 episode: &EpisodeRecord,
102) -> Result<WorkerUploadResponse> {
103 let base_url = config
104 .worker
105 .base_url
106 .as_ref()
107 .context("missing TRACE_SHARE_WORKER_BASE_URL")?;
108 ensure_secure_url(base_url, "worker base URL")?;
109 let presign_endpoint = format!("{}/v1/episodes/presign", base_url.trim_end_matches('/'));
110 let complete_endpoint = format!("{}/v1/episodes/complete", base_url.trim_end_matches('/'));
111 let client = reqwest::Client::builder()
112 .timeout(std::time::Duration::from_secs(
113 config.worker.timeout_seconds.max(5),
114 ))
115 .build()?;
116
117 let presign_payload = serde_json::json!({
118 "episode_id": episode.id,
119 "content_hash": episode.content_hash,
120 "content_type": "application/json",
121 });
122
123 let presign = post_with_retry_json::<PresignUploadResponse>(
124 &client,
125 &presign_endpoint,
126 config.worker.api_token.as_deref(),
127 &presign_payload,
128 "worker episode presign",
129 )
130 .await?;
131
132 let episode_bytes = serde_json::to_vec(episode)?;
133 put_with_retry(
134 &client,
135 &presign.upload_url,
136 presign.headers.as_ref(),
137 &episode_bytes,
138 "worker episode upload",
139 )
140 .await?;
141
142 let complete_payload = serde_json::json!({
143 "episode_id": episode.id,
144 "object_key": presign.object_key,
145 "content_hash": episode.content_hash,
146 });
147
148 post_with_retry_json::<WorkerUploadResponse>(
149 &client,
150 &complete_endpoint,
151 config.worker.api_token.as_deref(),
152 &complete_payload,
153 "worker episode complete",
154 )
155 .await
156}
157
158pub async fn push_revocation(
159 config: &AppConfig,
160 episode_id: &str,
161 revoked_at: &str,
162 reason: Option<&str>,
163) -> Result<()> {
164 let base_url = config
165 .worker
166 .base_url
167 .as_ref()
168 .context("missing TRACE_SHARE_WORKER_BASE_URL")?;
169 ensure_secure_url(base_url, "worker base URL")?;
170
171 let endpoint = format!("{}/v1/revocations", base_url.trim_end_matches('/'));
172 let client = reqwest::Client::builder()
173 .timeout(std::time::Duration::from_secs(
174 config.worker.timeout_seconds.max(5),
175 ))
176 .build()?;
177
178 let payload = serde_json::json!({
179 "episode_id": episode_id,
180 "revoked_at": revoked_at,
181 "reason": reason,
182 });
183
184 let mut attempt: u32 = 0;
185 loop {
186 let mut req = client.post(&endpoint).json(&payload);
187 if let Some(token) = config.worker.api_token.as_ref() {
188 req = req.bearer_auth(token);
189 }
190
191 let resp = req.send().await;
192 match resp {
193 Ok(resp) => {
194 let status = resp.status();
195 if status.is_success() {
196 return Ok(());
197 }
198
199 let body = resp.text().await.unwrap_or_default();
200 if !should_retry_status(status) || attempt >= 4 {
201 anyhow::bail!(
202 "worker revocation push failed: status={} body={}",
203 status,
204 body
205 );
206 }
207 }
208 Err(e) => {
209 let retryable_transport = e.is_timeout() || e.is_connect() || e.is_request();
210 if !retryable_transport || attempt >= 4 {
211 return Err(e).context("worker revocation request failed after retries");
212 }
213 }
214 }
215
216 attempt += 1;
217 let jitter: u64 = thread_rng().gen_range(50..300);
218 let wait_ms = (2u64.pow(attempt) * 200) + jitter;
219 sleep(Duration::from_millis(wait_ms)).await;
220 }
221}
222
223async fn post_with_retry_json<T: for<'de> Deserialize<'de>>(
224 client: &reqwest::Client,
225 endpoint: &str,
226 bearer_token: Option<&str>,
227 payload: &serde_json::Value,
228 label: &str,
229) -> Result<T> {
230 let mut attempt: u32 = 0;
231 loop {
232 let mut req = client.post(endpoint).json(payload);
233 if let Some(token) = bearer_token {
234 req = req.bearer_auth(token);
235 }
236
237 let resp = req.send().await;
238 match resp {
239 Ok(resp) => {
240 let status = resp.status();
241 if status.is_success() {
242 return Ok(resp.json::<T>().await?);
243 }
244 let body = resp.text().await.unwrap_or_default();
245 if !should_retry_status(status) || attempt >= 4 {
246 anyhow::bail!("{label} failed: status={} body={}", status, body);
247 }
248 }
249 Err(e) => {
250 let retryable_transport = e.is_timeout() || e.is_connect() || e.is_request();
251 if !retryable_transport || attempt >= 4 {
252 return Err(e).with_context(|| format!("{label} request failed after retries"));
253 }
254 }
255 }
256
257 attempt += 1;
258 let jitter: u64 = thread_rng().gen_range(50..300);
259 let wait_ms = (2u64.pow(attempt) * 200) + jitter;
260 sleep(Duration::from_millis(wait_ms)).await;
261 }
262}
263
264async fn put_with_retry(
265 client: &reqwest::Client,
266 endpoint: &str,
267 headers: Option<&BTreeMap<String, String>>,
268 body: &[u8],
269 label: &str,
270) -> Result<()> {
271 ensure_secure_url(endpoint, label)?;
272 let mut attempt: u32 = 0;
273 loop {
274 let mut req = client.put(endpoint).body(body.to_vec());
275 if let Some(headers) = headers {
276 for (k, v) in headers {
277 req = req.header(k, v);
278 }
279 }
280
281 let resp = req.send().await;
282 match resp {
283 Ok(resp) => {
284 let status = resp.status();
285 if status.is_success() {
286 return Ok(());
287 }
288 let body = resp.text().await.unwrap_or_default();
289 if !should_retry_status(status) || attempt >= 4 {
290 anyhow::bail!("{label} failed: status={} body={}", status, body);
291 }
292 }
293 Err(e) => {
294 let retryable_transport = e.is_timeout() || e.is_connect() || e.is_request();
295 if !retryable_transport || attempt >= 4 {
296 return Err(e).with_context(|| format!("{label} request failed after retries"));
297 }
298 }
299 }
300
301 attempt += 1;
302 let jitter: u64 = thread_rng().gen_range(50..300);
303 let wait_ms = (2u64.pow(attempt) * 200) + jitter;
304 sleep(Duration::from_millis(wait_ms)).await;
305 }
306}
307
308fn should_retry_status(status: reqwest::StatusCode) -> bool {
309 status == reqwest::StatusCode::TOO_MANY_REQUESTS || status.is_server_error()
310}