gitlab_runner/
uploader.rs

1//! Helpers to upload to gitlab
2use std::fs::File;
3use std::future::Future;
4use std::io::{Seek, Write};
5use std::path::Path;
6use std::pin::Pin;
7use std::task::Poll;
8use std::thread;
9
10use flate2::{Compression, GzBuilder};
11use futures::{future::BoxFuture, AsyncWrite, FutureExt};
12use reqwest::Body;
13use tokio::fs::File as AsyncFile;
14use tokio::sync::mpsc::{self, error::SendError};
15use tokio::sync::oneshot;
16use tokio_util::io::ReaderStream;
17use tracing::{error, warn};
18
19use crate::{
20    client::{ArtifactFormat, ArtifactInfo, Client, JobArtifact},
21    JobResult,
22};
23
24const DEFAULT_ARTIFACT_NAME: &str = "default";
25
26#[derive(Debug)]
27enum UploadRequest {
28    NewFile(String, oneshot::Sender<std::io::Result<()>>),
29    WriteData(Vec<u8>, oneshot::Sender<std::io::Result<()>>),
30    Finish(oneshot::Sender<std::io::Result<File>>),
31}
32
33enum UploadFileState<'a> {
34    Idle,
35    Writing(
36        Option<BoxFuture<'a, Result<(), SendError<UploadRequest>>>>,
37        oneshot::Receiver<std::io::Result<()>>,
38    ),
39}
40
41fn zip_thread(temp: File, mut rx: mpsc::Receiver<UploadRequest>) {
42    let mut zip = zip::ZipWriter::new(temp);
43    let options =
44        zip::write::SimpleFileOptions::default().compression_method(zip::CompressionMethod::Stored);
45
46    loop {
47        if let Some(request) = rx.blocking_recv() {
48            match request {
49                UploadRequest::NewFile(s, tx) => {
50                    let r = zip.start_file(s, options);
51                    tx.send(r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)))
52                        .expect("Couldn't send reply");
53                }
54                UploadRequest::WriteData(v, tx) => {
55                    let r = zip.write(&v);
56                    tx.send(r.and(Ok(()))).expect("Couldn't send reply");
57                }
58                UploadRequest::Finish(tx) => {
59                    let r = zip.finish();
60                    let reply = match r {
61                        Ok(mut file) => file.rewind().map(|()| file),
62                        Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
63                    };
64                    tx.send(reply).expect("Couldn't send finished zip");
65                    return;
66                }
67            }
68        } else {
69            return;
70        }
71    }
72}
73
74fn gzip_thread(mut temp: File, mut rx: mpsc::Receiver<UploadRequest>) {
75    let mut gz = match rx.blocking_recv() {
76        Some(UploadRequest::NewFile(s, tx)) => {
77            tx.send(Ok(())).expect("Couldn't send reply");
78            GzBuilder::new()
79                .filename(s)
80                .write(&mut temp, Compression::default())
81        }
82        Some(UploadRequest::WriteData(_, tx)) => {
83            tx.send(Err(std::io::Error::new(
84                std::io::ErrorKind::Other,
85                "no file open",
86            )))
87            .expect("Couldn't send reply");
88            return;
89        }
90        Some(UploadRequest::Finish(tx)) => {
91            tx.send(Err(std::io::Error::new(
92                std::io::ErrorKind::Other,
93                "no file open",
94            )))
95            .expect("Couldn't send reply");
96            return;
97        }
98        None => {
99            return;
100        }
101    };
102
103    loop {
104        if let Some(request) = rx.blocking_recv() {
105            match request {
106                UploadRequest::NewFile(_, tx) => {
107                    tx.send(Err(std::io::Error::new(
108                        std::io::ErrorKind::Other,
109                        "multiple files not permitted in gzip",
110                    )))
111                    .expect("Couldn't send reply");
112                    return;
113                }
114                UploadRequest::WriteData(v, tx) => {
115                    let r = gz.write(&v);
116                    tx.send(r.and(Ok(()))).expect("Couldn't send reply");
117                }
118                UploadRequest::Finish(tx) => {
119                    let r = gz.finish();
120                    let reply = match r {
121                        Ok(_) => temp.rewind().map(|()| temp),
122                        Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
123                    };
124                    tx.send(reply).expect("Couldn't send finished gzip");
125                    return;
126                }
127            }
128        } else {
129            return;
130        }
131    }
132}
133
134/// Single file to be uploaded
135pub struct UploadFile<'a> {
136    tx: &'a mpsc::Sender<UploadRequest>,
137    state: UploadFileState<'a>,
138}
139
140impl AsyncWrite for UploadFile<'_> {
141    fn poll_write(
142        self: std::pin::Pin<&mut Self>,
143        cx: &mut std::task::Context<'_>,
144        buf: &[u8],
145    ) -> std::task::Poll<std::io::Result<usize>> {
146        let this = self.get_mut();
147        loop {
148            match this.state {
149                UploadFileState::Idle => {
150                    let (tx, rx) = oneshot::channel();
151                    let send = this
152                        .tx
153                        .send(UploadRequest::WriteData(Vec::from(buf), tx))
154                        .boxed();
155                    this.state = UploadFileState::Writing(Some(send), rx)
156                }
157                UploadFileState::Writing(ref mut send, ref mut rx) => {
158                    if let Some(f) = send {
159                        // TODO error handling
160                        let _r = futures::ready!(f.as_mut().poll(cx));
161                        *send = None;
162                    } else {
163                        let _r = futures::ready!(Pin::new(rx).poll(cx));
164                        this.state = UploadFileState::Idle;
165                        return Poll::Ready(Ok(buf.len()));
166                    }
167                }
168            }
169        }
170    }
171
172    fn poll_flush(
173        self: std::pin::Pin<&mut Self>,
174        _cx: &mut std::task::Context<'_>,
175    ) -> std::task::Poll<std::io::Result<()>> {
176        Poll::Ready(Ok(()))
177    }
178
179    fn poll_close(
180        self: std::pin::Pin<&mut Self>,
181        _cx: &mut std::task::Context<'_>,
182    ) -> std::task::Poll<std::io::Result<()>> {
183        Poll::Ready(Ok(()))
184    }
185}
186
187fn make_artifact_name(base: Option<&str>, format: &ArtifactFormat) -> String {
188    let name = base.unwrap_or(DEFAULT_ARTIFACT_NAME);
189    match format {
190        ArtifactFormat::Zip => format!("{}.zip", name),
191        ArtifactFormat::Gzip => format!("{}.gz", name),
192        ArtifactFormat::Raw => unimplemented!("Raw artifacts are not supported."),
193    }
194}
195
196/// An upload to gitlab
197pub struct Uploader<'a> {
198    client: Client,
199    job_id: u64,
200    job_token: String,
201    artifact: &'a JobArtifact,
202    tx: mpsc::Sender<UploadRequest>,
203}
204
205impl<'a> Uploader<'a> {
206    pub(crate) fn new(
207        client: Client,
208        build_dir: &Path,
209        job_id: u64,
210        job_token: String,
211        artifact: &'a JobArtifact,
212    ) -> Result<Self, ()> {
213        let temp = tempfile::tempfile_in(build_dir)
214            .map_err(|e| warn!("Failed to create artifacts temp file: {:?}", e))?;
215
216        let (tx, rx) = mpsc::channel(2);
217        match artifact.artifact_format {
218            ArtifactFormat::Zip => {
219                thread::spawn(move || zip_thread(temp, rx));
220            }
221            ArtifactFormat::Gzip => {
222                thread::spawn(move || gzip_thread(temp, rx));
223            }
224            ArtifactFormat::Raw => {
225                error!("Raw artifacts are currently unsupported.");
226                return Err(());
227            }
228        }
229        Ok(Self {
230            client,
231            job_id,
232            job_token,
233            artifact,
234            tx,
235        })
236    }
237
238    /// Create a new file to be uploaded
239    pub(crate) async fn file(&mut self, name: String) -> Result<UploadFile<'_>, ()> {
240        let (tx, rx) = oneshot::channel();
241        self.tx
242            .send(UploadRequest::NewFile(name, tx))
243            .await
244            .expect("Failed to create file");
245        match rx.await {
246            Ok(Ok(())) => Ok(()),
247            Ok(Err(err)) => {
248                warn!("Failed to create compressed artifact file: {:?}", err);
249                Err(())
250            }
251            Err(_) => {
252                warn!("Failed to compress artifacts: thread died");
253                Err(())
254            }
255        }?;
256
257        Ok(UploadFile {
258            tx: &self.tx,
259            state: UploadFileState::Idle,
260        })
261    }
262
263    pub(crate) async fn upload(self) -> JobResult {
264        let (tx, rx) = oneshot::channel();
265        self.tx.send(UploadRequest::Finish(tx)).await.unwrap();
266        let file = AsyncFile::from_std(match rx.await {
267            Ok(Ok(file)) => Ok(file),
268            Ok(Err(err)) => {
269                warn!("Failed to compress artifacts: {:?}", err);
270                Err(())
271            }
272            Err(_) => {
273                warn!("Failed to compress artifacts: thread died");
274                Err(())
275            }
276        }?);
277
278        let reader = ReaderStream::new(file);
279        self.client
280            .upload_artifact(
281                self.job_id,
282                &self.job_token,
283                ArtifactInfo {
284                    name: &make_artifact_name(
285                        self.artifact.name.as_deref(),
286                        &self.artifact.artifact_format,
287                    ),
288                    artifact_format: &self.artifact.artifact_format.to_string(),
289                    artifact_type: &self.artifact.artifact_type,
290                    expire_in: self.artifact.expire_in.as_deref(),
291                },
292                Body::wrap_stream(reader),
293            )
294            .await
295            .map_err(|e| {
296                warn!("Failed to upload artifacts: {:?}", e);
297            })
298    }
299}