taskcluster_download/
artifact.rs

1use crate::factory::{AsyncWriterFactory, CursorWriterFactory, FileWriterFactory};
2use crate::geturl::{get_url, RetriableResult};
3use crate::object::download_impl;
4use crate::service::{ObjectService, QueueService};
5use anyhow::{anyhow, bail, Context, Result};
6use taskcluster::retry::{Backoff, Retry};
7use taskcluster::{ClientBuilder, Credentials, Object, Queue};
8use tokio::fs::File;
9
10/// Download an artifact to a [Vec<u8>] and return that.  If the artifact is unexpectedly large,
11/// this may exhaust system memory and panic.  If `run_id` is None then the latest run will be
12/// used.  Returns (data, content_type).
13pub async fn download_artifact_to_vec(
14    task_id: &str,
15    run_id: Option<&str>,
16    name: &str,
17    retry: &Retry,
18    queue_service: &Queue,
19) -> Result<(Vec<u8>, String)> {
20    let mut factory = CursorWriterFactory::new();
21    let content_type = download_artifact_impl(
22        task_id,
23        run_id,
24        name,
25        retry,
26        queue_service,
27        object_service_factory,
28        &mut factory,
29    )
30    .await?;
31    Ok((factory.into_inner(), content_type))
32}
33
34/// Download an artifact into the given buffer and return the slice of that buffer containing the
35/// artifact.  If the artifact is larger than the buffer, then resulting error can be downcast to
36/// [std::io::Error] with kind `WriteZero` and the somewhat cryptic message "write zero byte into
37/// writer".  Returns (slice, content_type).  If `run_id` is None then the latest run will be used.
38pub async fn download_artifact_to_buf<'a>(
39    task_id: &str,
40    run_id: Option<&str>,
41    name: &str,
42    retry: &Retry,
43    queue_service: &Queue,
44    buf: &'a mut [u8],
45) -> Result<(&'a [u8], String)> {
46    let mut factory = CursorWriterFactory::for_buf(buf);
47    let content_type = download_artifact_impl(
48        task_id,
49        run_id,
50        name,
51        retry,
52        queue_service,
53        object_service_factory,
54        &mut factory,
55    )
56    .await?;
57    let size = factory.size();
58    Ok((&buf[..size], content_type))
59}
60
61/// Download an artifact into the given File.  The file must be open in write mode and must be
62/// clone-able (that is, [File::try_clone()] must succeed) in order to support retried downloads.
63/// The File is returned with all write operations complete but with unspecified position.  If
64/// `run_id` is None then the latest run will be used.  Returns (file, content_type).
65pub async fn download_artifact_to_file(
66    task_id: &str,
67    run_id: Option<&str>,
68    name: &str,
69    retry: &Retry,
70    queue_service: &Queue,
71    file: File,
72) -> Result<(File, String)> {
73    let mut factory = FileWriterFactory::new(file);
74    let content_type = download_artifact_impl(
75        task_id,
76        run_id,
77        name,
78        retry,
79        queue_service,
80        object_service_factory,
81        &mut factory,
82    )
83    .await?;
84    Ok((factory.into_inner().await?, content_type))
85}
86
87/// Download an artifact using an [AsyncWriterFactory].  This is useful for advanced cases where one
88/// of the convenience functions is not adequate.  Returns the artifact's content type.  If
89/// `run_id` is None then the latest run will be used.  Returns the content type.
90pub async fn download_artifact_with_factory<AWF: AsyncWriterFactory>(
91    task_id: &str,
92    run_id: Option<&str>,
93    name: &str,
94    retry: &Retry,
95    queue_service: &Queue,
96    writer_factory: &mut AWF,
97) -> Result<String> {
98    let content_type = download_artifact_impl(
99        task_id,
100        run_id,
101        name,
102        retry,
103        queue_service,
104        object_service_factory,
105        writer_factory,
106    )
107    .await?;
108    Ok(content_type)
109}
110
111/// Create an object service client with the given credentials and retry configuration.
112/// This allows injection of fake object services during testing.
113fn object_service_factory(queue: &Queue, creds: Credentials, retry: &Retry) -> Result<Object> {
114    Object::new(
115        ClientBuilder::new(queue.client.root_url())
116            .credentials(creds)
117            .retry(retry.clone()),
118    )
119}
120
121async fn download_artifact_impl<Q, O, OF, AWF>(
122    task_id: &str,
123    run_id: Option<&str>,
124    name: &str,
125    retry: &Retry,
126    queue_service: &Q,
127    object_service_factory: OF,
128    writer_factory: &mut AWF,
129) -> Result<String>
130where
131    Q: QueueService,
132    O: ObjectService,
133    OF: FnOnce(&Q, Credentials, &Retry) -> Result<O>,
134    AWF: AsyncWriterFactory,
135{
136    let artifact = if let Some(run_id) = run_id {
137        queue_service.artifact(task_id, run_id, name).await?
138    } else {
139        queue_service.latestArtifact(task_id, name).await?
140    };
141
142    fn get_str<'a>(v: &'a serde_json::Value, name: &str, p: &str) -> Result<&'a str> {
143        Ok(v.get(p)
144            .ok_or_else(|| anyhow!("{} property {} not found", name, p))?
145            .as_str()
146            .ok_or_else(|| anyhow!("{} property {} is not a string", name, p))?)
147    }
148
149    let storage_type = get_str(&artifact, "artifact", "storageType")?;
150    match storage_type {
151        "s3" | "reference" => {
152            let url = get_str(&artifact, "artifact", "url")?;
153            return download_url(url, retry, writer_factory).await;
154        }
155        "object" => {
156            // create a new object-service client based on the given credentials
157            let creds_json = artifact
158                .get("credentials")
159                .ok_or_else(|| anyhow!("Artifact property credentials not found"))?;
160            let client_id = get_str(&creds_json, "artifact.credentials", "client_id")?;
161            let access_token = get_str(&creds_json, "artifact.credentials", "access_token")?;
162            // certificate may not be present
163            let certificate_res = get_str(&creds_json, "artifact.credentials", "certificate");
164            let creds = if let Ok(certificate) = certificate_res {
165                Credentials::new_with_certificate(client_id, access_token, certificate)
166            } else {
167                Credentials::new(client_id, access_token)
168            };
169            let object_service = object_service_factory(queue_service, creds, retry)?;
170
171            let name = get_str(&artifact, "artifact", "name")?;
172
173            // defer to the object-service download support
174            return download_impl(name, retry, &object_service, writer_factory).await;
175        }
176        "error" => {
177            let message = get_str(&artifact, "artifact", "message")?;
178            let reason = get_str(&artifact, "artifact", "reason")?;
179            // this looks backward, but results in an Error with "{message}.. caused by {reason}"
180            return Err(anyhow!("{}", reason).context(format!("Error Artifact: {}", message)));
181        }
182        st => bail!("Unknown artifact storageType {}", st),
183    };
184}
185
186async fn download_url<AWF: AsyncWriterFactory>(
187    url: &str,
188    retry: &Retry,
189    writer_factory: &mut AWF,
190) -> Result<String> {
191    let mut backoff = Backoff::new(retry);
192    let mut attempts = 0;
193
194    loop {
195        attempts += 1;
196        let mut writer = writer_factory.get_writer().await?;
197        match get_url(url, writer.as_mut()).await {
198            RetriableResult::Ok(fetchmeta) => return Ok(fetchmeta.content_type),
199            RetriableResult::Retriable(err) => match backoff.next_backoff() {
200                Some(duration) => {
201                    tokio::time::sleep(duration).await;
202                    continue;
203                }
204                None => {
205                    return Err(err).context(format!("Download failed after {} attempts", attempts))
206                }
207            },
208            RetriableResult::Permanent(err) => {
209                return Err(err);
210            }
211        }
212    }
213}
214
215#[cfg(test)]
216mod test {
217    use super::*;
218    use crate::test_helpers::{FakeDataServer, FakeObjectService, FakeQueueService, Logger};
219    use serde_json::json;
220    use taskcluster::chrono::{Duration, Utc};
221
222    /// object_service_factory function for cases where the object service is not used
223    fn unused_object_service_factory(
224        _queue: &FakeQueueService,
225        _creds: Credentials,
226        _retry: &Retry,
227    ) -> Result<Object> {
228        unreachable!()
229    }
230
231    #[tokio::test]
232    async fn s3_artifact_with_retry() -> Result<()> {
233        let server = FakeDataServer::new(true, &[500, 200]);
234        let mut factory = CursorWriterFactory::new();
235        let logger = Logger::default();
236        let queue_service = FakeQueueService {
237            logger: logger.clone(),
238            response: json!({
239                "storageType": "s3",
240                "url": server.data_url(),
241            }),
242        };
243
244        let content_type = download_artifact_impl(
245            "LyTqA-MYReaNrLTYYHyrtw",
246            Some("1"),
247            "public/thing.txt",
248            &Retry::default(),
249            &queue_service,
250            unused_object_service_factory,
251            &mut factory,
252        )
253        .await?;
254
255        logger.assert(vec![
256            "artifact LyTqA-MYReaNrLTYYHyrtw 1 public/thing.txt".to_owned()
257        ]);
258
259        assert_eq!(&content_type, "text/plain");
260
261        let data = factory.into_inner();
262        assert_eq!(&data, b"hello, world");
263
264        Ok(())
265    }
266
267    #[tokio::test]
268    async fn s3_latest_artifact_with_retry() -> Result<()> {
269        let server = FakeDataServer::new(true, &[500, 200]);
270        let mut factory = CursorWriterFactory::new();
271        let logger = Logger::default();
272        let queue_service = FakeQueueService {
273            logger: logger.clone(),
274            response: json!({
275                "storageType": "s3",
276                "url": server.data_url(),
277            }),
278        };
279
280        let content_type = download_artifact_impl(
281            "LyTqA-MYReaNrLTYYHyrtw",
282            None,
283            "public/thing.txt",
284            &Retry::default(),
285            &queue_service,
286            unused_object_service_factory,
287            &mut factory,
288        )
289        .await?;
290
291        logger.assert(vec![
292            "latestArtifact LyTqA-MYReaNrLTYYHyrtw public/thing.txt".to_owned(),
293        ]);
294
295        assert_eq!(&content_type, "text/plain");
296
297        let data = factory.into_inner();
298        assert_eq!(&data, b"hello, world");
299
300        Ok(())
301    }
302
303    #[tokio::test]
304    async fn object_artifact() -> Result<()> {
305        let server = FakeDataServer::new(false, &[200]);
306        let mut factory = CursorWriterFactory::new();
307        let logger = Logger::default();
308        let queue_service = FakeQueueService {
309            logger: logger.clone(),
310            response: json!({
311                "storageType": "object",
312                "name": "artifacts/data",
313                "credentials": {
314                    "client_id": "c",
315                    "access_token": "a",
316                    "certificate": "cert",
317                },
318            }),
319        };
320
321        let object_service_factory = {
322            let logger = logger.clone();
323            let url = server.data_url();
324            move |_queue: &FakeQueueService, creds: Credentials, _retry: &Retry| {
325                assert_eq!(creds.client_id, "c");
326                assert_eq!(creds.access_token, "a");
327                assert_eq!(creds.certificate, Some("cert".to_owned()));
328                Ok(FakeObjectService {
329                    logger,
330                    response: json!({
331                        "method": "getUrl",
332                        "url": url,
333                        "hashes": {
334                            "sha256":"09ca7e4eaa6e8ae9c7d261167129184883644d07dfba7cbfbc4c8a2e08360d5b",
335                        },
336                        "expires": Utc::now() + Duration::hours(2),
337                    }),
338                })
339            }
340        };
341        let content_type = download_artifact_impl(
342            "LyTqA-MYReaNrLTYYHyrtw",
343            Some("2"),
344            "public/thing.txt",
345            &Retry::default(),
346            &queue_service,
347            object_service_factory,
348            &mut factory,
349        )
350        .await?;
351
352        logger.assert(vec![
353            "artifact LyTqA-MYReaNrLTYYHyrtw 2 public/thing.txt".to_owned(),
354            "startDownload artifacts/data {\"getUrl\":true}".to_owned(),
355        ]);
356
357        assert_eq!(&content_type, "text/plain");
358
359        let data = factory.into_inner();
360        assert_eq!(&data, b"hello, world");
361
362        Ok(())
363    }
364
365    #[tokio::test]
366    async fn object_artifact_no_cert() -> Result<()> {
367        let server = FakeDataServer::new(false, &[200]);
368        let mut factory = CursorWriterFactory::new();
369        let logger = Logger::default();
370        let queue_service = FakeQueueService {
371            logger: logger.clone(),
372            response: json!({
373                "storageType": "object",
374                "name": "artifacts/data",
375                "credentials": {
376                    "client_id": "c",
377                    "access_token": "a",
378                    // no certificate
379                },
380            }),
381        };
382
383        let object_service_factory = {
384            let logger = logger.clone();
385            let url = server.data_url();
386            move |_queue: &FakeQueueService, creds: Credentials, _retry: &Retry| {
387                assert_eq!(creds.client_id, "c");
388                assert_eq!(creds.access_token, "a");
389                assert_eq!(creds.certificate, None);
390                Ok(FakeObjectService {
391                    logger,
392                    response: json!({
393                        "method": "getUrl",
394                        "url": url,
395                        "hashes": {
396                            "sha256":"09ca7e4eaa6e8ae9c7d261167129184883644d07dfba7cbfbc4c8a2e08360d5b",
397                        },
398                        "expires": Utc::now() + Duration::hours(2),
399                    }),
400                })
401            }
402        };
403        let content_type = download_artifact_impl(
404            "LyTqA-MYReaNrLTYYHyrtw",
405            Some("2"),
406            "public/thing.txt",
407            &Retry::default(),
408            &queue_service,
409            object_service_factory,
410            &mut factory,
411        )
412        .await?;
413
414        logger.assert(vec![
415            "artifact LyTqA-MYReaNrLTYYHyrtw 2 public/thing.txt".to_owned(),
416            "startDownload artifacts/data {\"getUrl\":true}".to_owned(),
417        ]);
418
419        assert_eq!(&content_type, "text/plain");
420
421        let data = factory.into_inner();
422        assert_eq!(&data, b"hello, world");
423
424        Ok(())
425    }
426
427    #[tokio::test]
428    async fn error_artifact() -> Result<()> {
429        let mut factory = CursorWriterFactory::new();
430        let logger = Logger::default();
431        let queue_service = FakeQueueService {
432            logger: logger.clone(),
433            response: json!({
434                "storageType": "error",
435                "message": "uhoh",
436                "reason": "test case",
437            }),
438        };
439
440        let res = download_artifact_impl(
441            "LyTqA-MYReaNrLTYYHyrtw",
442            None,
443            "public/thing.txt",
444            &Retry::default(),
445            &queue_service,
446            unused_object_service_factory,
447            &mut factory,
448        )
449        .await;
450
451        logger.assert(vec![
452            "latestArtifact LyTqA-MYReaNrLTYYHyrtw public/thing.txt".to_owned(),
453        ]);
454
455        let err = res.expect_err("Should have returned an Err");
456        assert_eq!(format!("{}", err), "Error Artifact: uhoh");
457        assert_eq!(format!("{}", err.root_cause()), "test case");
458
459        Ok(())
460    }
461}