1use crate::factory::{AsyncWriterFactory, CursorWriterFactory, FileWriterFactory};
2use crate::geturl::{get_url, RetriableResult};
3use crate::object::download_impl;
4use crate::service::{ObjectService, QueueService};
5use anyhow::{anyhow, bail, Context, Result};
6use taskcluster::retry::{Backoff, Retry};
7use taskcluster::{ClientBuilder, Credentials, Object, Queue};
8use tokio::fs::File;
9
10pub async fn download_artifact_to_vec(
14 task_id: &str,
15 run_id: Option<&str>,
16 name: &str,
17 retry: &Retry,
18 queue_service: &Queue,
19) -> Result<(Vec<u8>, String)> {
20 let mut factory = CursorWriterFactory::new();
21 let content_type = download_artifact_impl(
22 task_id,
23 run_id,
24 name,
25 retry,
26 queue_service,
27 object_service_factory,
28 &mut factory,
29 )
30 .await?;
31 Ok((factory.into_inner(), content_type))
32}
33
34pub async fn download_artifact_to_buf<'a>(
39 task_id: &str,
40 run_id: Option<&str>,
41 name: &str,
42 retry: &Retry,
43 queue_service: &Queue,
44 buf: &'a mut [u8],
45) -> Result<(&'a [u8], String)> {
46 let mut factory = CursorWriterFactory::for_buf(buf);
47 let content_type = download_artifact_impl(
48 task_id,
49 run_id,
50 name,
51 retry,
52 queue_service,
53 object_service_factory,
54 &mut factory,
55 )
56 .await?;
57 let size = factory.size();
58 Ok((&buf[..size], content_type))
59}
60
61pub async fn download_artifact_to_file(
66 task_id: &str,
67 run_id: Option<&str>,
68 name: &str,
69 retry: &Retry,
70 queue_service: &Queue,
71 file: File,
72) -> Result<(File, String)> {
73 let mut factory = FileWriterFactory::new(file);
74 let content_type = download_artifact_impl(
75 task_id,
76 run_id,
77 name,
78 retry,
79 queue_service,
80 object_service_factory,
81 &mut factory,
82 )
83 .await?;
84 Ok((factory.into_inner().await?, content_type))
85}
86
87pub async fn download_artifact_with_factory<AWF: AsyncWriterFactory>(
91 task_id: &str,
92 run_id: Option<&str>,
93 name: &str,
94 retry: &Retry,
95 queue_service: &Queue,
96 writer_factory: &mut AWF,
97) -> Result<String> {
98 let content_type = download_artifact_impl(
99 task_id,
100 run_id,
101 name,
102 retry,
103 queue_service,
104 object_service_factory,
105 writer_factory,
106 )
107 .await?;
108 Ok(content_type)
109}
110
111fn object_service_factory(queue: &Queue, creds: Credentials, retry: &Retry) -> Result<Object> {
114 Object::new(
115 ClientBuilder::new(queue.client.root_url())
116 .credentials(creds)
117 .retry(retry.clone()),
118 )
119}
120
121async fn download_artifact_impl<Q, O, OF, AWF>(
122 task_id: &str,
123 run_id: Option<&str>,
124 name: &str,
125 retry: &Retry,
126 queue_service: &Q,
127 object_service_factory: OF,
128 writer_factory: &mut AWF,
129) -> Result<String>
130where
131 Q: QueueService,
132 O: ObjectService,
133 OF: FnOnce(&Q, Credentials, &Retry) -> Result<O>,
134 AWF: AsyncWriterFactory,
135{
136 let artifact = if let Some(run_id) = run_id {
137 queue_service.artifact(task_id, run_id, name).await?
138 } else {
139 queue_service.latestArtifact(task_id, name).await?
140 };
141
142 fn get_str<'a>(v: &'a serde_json::Value, name: &str, p: &str) -> Result<&'a str> {
143 Ok(v.get(p)
144 .ok_or_else(|| anyhow!("{} property {} not found", name, p))?
145 .as_str()
146 .ok_or_else(|| anyhow!("{} property {} is not a string", name, p))?)
147 }
148
149 let storage_type = get_str(&artifact, "artifact", "storageType")?;
150 match storage_type {
151 "s3" | "reference" => {
152 let url = get_str(&artifact, "artifact", "url")?;
153 return download_url(url, retry, writer_factory).await;
154 }
155 "object" => {
156 let creds_json = artifact
158 .get("credentials")
159 .ok_or_else(|| anyhow!("Artifact property credentials not found"))?;
160 let client_id = get_str(&creds_json, "artifact.credentials", "client_id")?;
161 let access_token = get_str(&creds_json, "artifact.credentials", "access_token")?;
162 let certificate_res = get_str(&creds_json, "artifact.credentials", "certificate");
164 let creds = if let Ok(certificate) = certificate_res {
165 Credentials::new_with_certificate(client_id, access_token, certificate)
166 } else {
167 Credentials::new(client_id, access_token)
168 };
169 let object_service = object_service_factory(queue_service, creds, retry)?;
170
171 let name = get_str(&artifact, "artifact", "name")?;
172
173 return download_impl(name, retry, &object_service, writer_factory).await;
175 }
176 "error" => {
177 let message = get_str(&artifact, "artifact", "message")?;
178 let reason = get_str(&artifact, "artifact", "reason")?;
179 return Err(anyhow!("{}", reason).context(format!("Error Artifact: {}", message)));
181 }
182 st => bail!("Unknown artifact storageType {}", st),
183 };
184}
185
186async fn download_url<AWF: AsyncWriterFactory>(
187 url: &str,
188 retry: &Retry,
189 writer_factory: &mut AWF,
190) -> Result<String> {
191 let mut backoff = Backoff::new(retry);
192 let mut attempts = 0;
193
194 loop {
195 attempts += 1;
196 let mut writer = writer_factory.get_writer().await?;
197 match get_url(url, writer.as_mut()).await {
198 RetriableResult::Ok(fetchmeta) => return Ok(fetchmeta.content_type),
199 RetriableResult::Retriable(err) => match backoff.next_backoff() {
200 Some(duration) => {
201 tokio::time::sleep(duration).await;
202 continue;
203 }
204 None => {
205 return Err(err).context(format!("Download failed after {} attempts", attempts))
206 }
207 },
208 RetriableResult::Permanent(err) => {
209 return Err(err);
210 }
211 }
212 }
213}
214
215#[cfg(test)]
216mod test {
217 use super::*;
218 use crate::test_helpers::{FakeDataServer, FakeObjectService, FakeQueueService, Logger};
219 use serde_json::json;
220 use taskcluster::chrono::{Duration, Utc};
221
222 fn unused_object_service_factory(
224 _queue: &FakeQueueService,
225 _creds: Credentials,
226 _retry: &Retry,
227 ) -> Result<Object> {
228 unreachable!()
229 }
230
231 #[tokio::test]
232 async fn s3_artifact_with_retry() -> Result<()> {
233 let server = FakeDataServer::new(true, &[500, 200]);
234 let mut factory = CursorWriterFactory::new();
235 let logger = Logger::default();
236 let queue_service = FakeQueueService {
237 logger: logger.clone(),
238 response: json!({
239 "storageType": "s3",
240 "url": server.data_url(),
241 }),
242 };
243
244 let content_type = download_artifact_impl(
245 "LyTqA-MYReaNrLTYYHyrtw",
246 Some("1"),
247 "public/thing.txt",
248 &Retry::default(),
249 &queue_service,
250 unused_object_service_factory,
251 &mut factory,
252 )
253 .await?;
254
255 logger.assert(vec![
256 "artifact LyTqA-MYReaNrLTYYHyrtw 1 public/thing.txt".to_owned()
257 ]);
258
259 assert_eq!(&content_type, "text/plain");
260
261 let data = factory.into_inner();
262 assert_eq!(&data, b"hello, world");
263
264 Ok(())
265 }
266
267 #[tokio::test]
268 async fn s3_latest_artifact_with_retry() -> Result<()> {
269 let server = FakeDataServer::new(true, &[500, 200]);
270 let mut factory = CursorWriterFactory::new();
271 let logger = Logger::default();
272 let queue_service = FakeQueueService {
273 logger: logger.clone(),
274 response: json!({
275 "storageType": "s3",
276 "url": server.data_url(),
277 }),
278 };
279
280 let content_type = download_artifact_impl(
281 "LyTqA-MYReaNrLTYYHyrtw",
282 None,
283 "public/thing.txt",
284 &Retry::default(),
285 &queue_service,
286 unused_object_service_factory,
287 &mut factory,
288 )
289 .await?;
290
291 logger.assert(vec![
292 "latestArtifact LyTqA-MYReaNrLTYYHyrtw public/thing.txt".to_owned(),
293 ]);
294
295 assert_eq!(&content_type, "text/plain");
296
297 let data = factory.into_inner();
298 assert_eq!(&data, b"hello, world");
299
300 Ok(())
301 }
302
303 #[tokio::test]
304 async fn object_artifact() -> Result<()> {
305 let server = FakeDataServer::new(false, &[200]);
306 let mut factory = CursorWriterFactory::new();
307 let logger = Logger::default();
308 let queue_service = FakeQueueService {
309 logger: logger.clone(),
310 response: json!({
311 "storageType": "object",
312 "name": "artifacts/data",
313 "credentials": {
314 "client_id": "c",
315 "access_token": "a",
316 "certificate": "cert",
317 },
318 }),
319 };
320
321 let object_service_factory = {
322 let logger = logger.clone();
323 let url = server.data_url();
324 move |_queue: &FakeQueueService, creds: Credentials, _retry: &Retry| {
325 assert_eq!(creds.client_id, "c");
326 assert_eq!(creds.access_token, "a");
327 assert_eq!(creds.certificate, Some("cert".to_owned()));
328 Ok(FakeObjectService {
329 logger,
330 response: json!({
331 "method": "getUrl",
332 "url": url,
333 "hashes": {
334 "sha256":"09ca7e4eaa6e8ae9c7d261167129184883644d07dfba7cbfbc4c8a2e08360d5b",
335 },
336 "expires": Utc::now() + Duration::hours(2),
337 }),
338 })
339 }
340 };
341 let content_type = download_artifact_impl(
342 "LyTqA-MYReaNrLTYYHyrtw",
343 Some("2"),
344 "public/thing.txt",
345 &Retry::default(),
346 &queue_service,
347 object_service_factory,
348 &mut factory,
349 )
350 .await?;
351
352 logger.assert(vec![
353 "artifact LyTqA-MYReaNrLTYYHyrtw 2 public/thing.txt".to_owned(),
354 "startDownload artifacts/data {\"getUrl\":true}".to_owned(),
355 ]);
356
357 assert_eq!(&content_type, "text/plain");
358
359 let data = factory.into_inner();
360 assert_eq!(&data, b"hello, world");
361
362 Ok(())
363 }
364
365 #[tokio::test]
366 async fn object_artifact_no_cert() -> Result<()> {
367 let server = FakeDataServer::new(false, &[200]);
368 let mut factory = CursorWriterFactory::new();
369 let logger = Logger::default();
370 let queue_service = FakeQueueService {
371 logger: logger.clone(),
372 response: json!({
373 "storageType": "object",
374 "name": "artifacts/data",
375 "credentials": {
376 "client_id": "c",
377 "access_token": "a",
378 },
380 }),
381 };
382
383 let object_service_factory = {
384 let logger = logger.clone();
385 let url = server.data_url();
386 move |_queue: &FakeQueueService, creds: Credentials, _retry: &Retry| {
387 assert_eq!(creds.client_id, "c");
388 assert_eq!(creds.access_token, "a");
389 assert_eq!(creds.certificate, None);
390 Ok(FakeObjectService {
391 logger,
392 response: json!({
393 "method": "getUrl",
394 "url": url,
395 "hashes": {
396 "sha256":"09ca7e4eaa6e8ae9c7d261167129184883644d07dfba7cbfbc4c8a2e08360d5b",
397 },
398 "expires": Utc::now() + Duration::hours(2),
399 }),
400 })
401 }
402 };
403 let content_type = download_artifact_impl(
404 "LyTqA-MYReaNrLTYYHyrtw",
405 Some("2"),
406 "public/thing.txt",
407 &Retry::default(),
408 &queue_service,
409 object_service_factory,
410 &mut factory,
411 )
412 .await?;
413
414 logger.assert(vec![
415 "artifact LyTqA-MYReaNrLTYYHyrtw 2 public/thing.txt".to_owned(),
416 "startDownload artifacts/data {\"getUrl\":true}".to_owned(),
417 ]);
418
419 assert_eq!(&content_type, "text/plain");
420
421 let data = factory.into_inner();
422 assert_eq!(&data, b"hello, world");
423
424 Ok(())
425 }
426
427 #[tokio::test]
428 async fn error_artifact() -> Result<()> {
429 let mut factory = CursorWriterFactory::new();
430 let logger = Logger::default();
431 let queue_service = FakeQueueService {
432 logger: logger.clone(),
433 response: json!({
434 "storageType": "error",
435 "message": "uhoh",
436 "reason": "test case",
437 }),
438 };
439
440 let res = download_artifact_impl(
441 "LyTqA-MYReaNrLTYYHyrtw",
442 None,
443 "public/thing.txt",
444 &Retry::default(),
445 &queue_service,
446 unused_object_service_factory,
447 &mut factory,
448 )
449 .await;
450
451 logger.assert(vec![
452 "latestArtifact LyTqA-MYReaNrLTYYHyrtw public/thing.txt".to_owned(),
453 ]);
454
455 let err = res.expect_err("Should have returned an Err");
456 assert_eq!(format!("{}", err), "Error Artifact: uhoh");
457 assert_eq!(format!("{}", err.root_cause()), "test case");
458
459 Ok(())
460 }
461}