Skip to main content

gitlab_runner/
uploader.rs

1//! Helpers to upload to gitlab
2use std::fs::File;
3use std::future::Future;
4use std::io::{Seek, Write};
5use std::path::Path;
6use std::pin::Pin;
7use std::task::Poll;
8use std::thread;
9
10use flate2::{Compression, GzBuilder};
11use futures::{AsyncWrite, FutureExt, future::BoxFuture};
12use reqwest::Body;
13use tokio::fs::File as AsyncFile;
14use tokio::sync::mpsc::{self, error::SendError};
15use tokio::sync::oneshot;
16use tokio_util::io::ReaderStream;
17use tracing::{error, warn};
18
19use crate::{
20    JobResult,
21    client::{ArtifactFormat, ArtifactInfo, Client, JobArtifact},
22};
23
24const DEFAULT_ARTIFACT_NAME: &str = "default";
25
26#[derive(Debug)]
27enum UploadRequest {
28    NewFile(String, oneshot::Sender<std::io::Result<()>>),
29    WriteData(Vec<u8>, oneshot::Sender<std::io::Result<()>>),
30    Finish(oneshot::Sender<std::io::Result<File>>),
31}
32
33enum UploadFileState<'a> {
34    Idle,
35    Writing(
36        Option<BoxFuture<'a, Result<(), SendError<UploadRequest>>>>,
37        oneshot::Receiver<std::io::Result<()>>,
38    ),
39}
40
41fn zip_thread(temp: File, mut rx: mpsc::Receiver<UploadRequest>) {
42    let mut zip = zip::ZipWriter::new(temp);
43    let options =
44        zip::write::SimpleFileOptions::default().compression_method(zip::CompressionMethod::Stored);
45
46    loop {
47        if let Some(request) = rx.blocking_recv() {
48            match request {
49                UploadRequest::NewFile(s, tx) => {
50                    let r = zip.start_file(s, options);
51                    tx.send(r.map_err(std::io::Error::other))
52                        .expect("Couldn't send reply");
53                }
54                UploadRequest::WriteData(v, tx) => {
55                    let r = zip.write(&v);
56                    tx.send(r.and(Ok(()))).expect("Couldn't send reply");
57                }
58                UploadRequest::Finish(tx) => {
59                    let r = zip.finish();
60                    let reply = match r {
61                        Ok(mut file) => file.rewind().map(|()| file),
62                        Err(e) => Err(std::io::Error::other(e)),
63                    };
64                    tx.send(reply).expect("Couldn't send finished zip");
65                    return;
66                }
67            }
68        } else {
69            return;
70        }
71    }
72}
73
74fn gzip_thread(mut temp: File, mut rx: mpsc::Receiver<UploadRequest>) {
75    let mut gz = match rx.blocking_recv() {
76        Some(UploadRequest::NewFile(s, tx)) => {
77            tx.send(Ok(())).expect("Couldn't send reply");
78            GzBuilder::new()
79                .filename(s)
80                .write(&mut temp, Compression::default())
81        }
82        Some(UploadRequest::WriteData(_, tx)) => {
83            tx.send(Err(std::io::Error::other("no file open")))
84                .expect("Couldn't send reply");
85            return;
86        }
87        Some(UploadRequest::Finish(tx)) => {
88            tx.send(Err(std::io::Error::other("no file open")))
89                .expect("Couldn't send reply");
90            return;
91        }
92        None => {
93            return;
94        }
95    };
96
97    loop {
98        if let Some(request) = rx.blocking_recv() {
99            match request {
100                UploadRequest::NewFile(_, tx) => {
101                    tx.send(Err(std::io::Error::other(
102                        "multiple files not permitted in gzip",
103                    )))
104                    .expect("Couldn't send reply");
105                    return;
106                }
107                UploadRequest::WriteData(v, tx) => {
108                    let r = gz.write(&v);
109                    tx.send(r.and(Ok(()))).expect("Couldn't send reply");
110                }
111                UploadRequest::Finish(tx) => {
112                    let r = gz.finish();
113                    let reply = match r {
114                        Ok(_) => temp.rewind().map(|()| temp),
115                        Err(e) => Err(std::io::Error::other(e)),
116                    };
117                    tx.send(reply).expect("Couldn't send finished gzip");
118                    return;
119                }
120            }
121        } else {
122            return;
123        }
124    }
125}
126
127/// Single file to be uploaded
128pub struct UploadFile<'a> {
129    tx: &'a mpsc::Sender<UploadRequest>,
130    state: UploadFileState<'a>,
131}
132
133impl AsyncWrite for UploadFile<'_> {
134    fn poll_write(
135        self: std::pin::Pin<&mut Self>,
136        cx: &mut std::task::Context<'_>,
137        buf: &[u8],
138    ) -> std::task::Poll<std::io::Result<usize>> {
139        let this = self.get_mut();
140        loop {
141            match this.state {
142                UploadFileState::Idle => {
143                    let (tx, rx) = oneshot::channel();
144                    let send = this
145                        .tx
146                        .send(UploadRequest::WriteData(Vec::from(buf), tx))
147                        .boxed();
148                    this.state = UploadFileState::Writing(Some(send), rx)
149                }
150                UploadFileState::Writing(ref mut send, ref mut rx) => {
151                    if let Some(f) = send {
152                        // TODO error handling
153                        let _r = futures::ready!(f.as_mut().poll(cx));
154                        *send = None;
155                    } else {
156                        let _r = futures::ready!(Pin::new(rx).poll(cx));
157                        this.state = UploadFileState::Idle;
158                        return Poll::Ready(Ok(buf.len()));
159                    }
160                }
161            }
162        }
163    }
164
165    fn poll_flush(
166        self: std::pin::Pin<&mut Self>,
167        _cx: &mut std::task::Context<'_>,
168    ) -> std::task::Poll<std::io::Result<()>> {
169        Poll::Ready(Ok(()))
170    }
171
172    fn poll_close(
173        self: std::pin::Pin<&mut Self>,
174        _cx: &mut std::task::Context<'_>,
175    ) -> std::task::Poll<std::io::Result<()>> {
176        Poll::Ready(Ok(()))
177    }
178}
179
180fn make_artifact_name(base: Option<&str>, format: &ArtifactFormat) -> String {
181    let name = base.unwrap_or(DEFAULT_ARTIFACT_NAME);
182    match format {
183        ArtifactFormat::Zip => format!("{name}.zip"),
184        ArtifactFormat::Gzip => format!("{name}.gz"),
185        ArtifactFormat::Raw => unimplemented!("Raw artifacts are not supported."),
186    }
187}
188
189/// An upload to gitlab
190pub struct Uploader<'a> {
191    client: Client,
192    job_id: u64,
193    job_token: String,
194    artifact: &'a JobArtifact,
195    tx: mpsc::Sender<UploadRequest>,
196}
197
198impl<'a> Uploader<'a> {
199    pub(crate) fn new(
200        client: Client,
201        build_dir: &Path,
202        job_id: u64,
203        job_token: String,
204        artifact: &'a JobArtifact,
205    ) -> Result<Self, ()> {
206        let temp = tempfile::tempfile_in(build_dir)
207            .map_err(|e| warn!("Failed to create artifacts temp file: {:?}", e))?;
208
209        let (tx, rx) = mpsc::channel(2);
210        match artifact.artifact_format {
211            ArtifactFormat::Zip => {
212                thread::spawn(move || zip_thread(temp, rx));
213            }
214            ArtifactFormat::Gzip => {
215                thread::spawn(move || gzip_thread(temp, rx));
216            }
217            ArtifactFormat::Raw => {
218                error!("Raw artifacts are currently unsupported.");
219                return Err(());
220            }
221        }
222        Ok(Self {
223            client,
224            job_id,
225            job_token,
226            artifact,
227            tx,
228        })
229    }
230
231    /// Create a new file to be uploaded
232    pub(crate) async fn file(&mut self, name: String) -> Result<UploadFile<'_>, ()> {
233        let (tx, rx) = oneshot::channel();
234        self.tx
235            .send(UploadRequest::NewFile(name, tx))
236            .await
237            .expect("Failed to create file");
238        match rx.await {
239            Ok(Ok(())) => Ok(()),
240            Ok(Err(err)) => {
241                warn!("Failed to create compressed artifact file: {:?}", err);
242                Err(())
243            }
244            Err(_) => {
245                warn!("Failed to compress artifacts: thread died");
246                Err(())
247            }
248        }?;
249
250        Ok(UploadFile {
251            tx: &self.tx,
252            state: UploadFileState::Idle,
253        })
254    }
255
256    pub(crate) async fn upload(self) -> JobResult {
257        let (tx, rx) = oneshot::channel();
258        self.tx.send(UploadRequest::Finish(tx)).await.unwrap();
259        let file = AsyncFile::from_std(match rx.await {
260            Ok(Ok(file)) => Ok(file),
261            Ok(Err(err)) => {
262                warn!("Failed to compress artifacts: {:?}", err);
263                Err(())
264            }
265            Err(_) => {
266                warn!("Failed to compress artifacts: thread died");
267                Err(())
268            }
269        }?);
270
271        let reader = ReaderStream::new(file);
272        self.client
273            .upload_artifact(
274                self.job_id,
275                &self.job_token,
276                ArtifactInfo {
277                    name: &make_artifact_name(
278                        self.artifact.name.as_deref(),
279                        &self.artifact.artifact_format,
280                    ),
281                    artifact_format: &self.artifact.artifact_format.to_string(),
282                    artifact_type: &self.artifact.artifact_type,
283                    expire_in: self.artifact.expire_in.as_deref(),
284                },
285                Body::wrap_stream(reader),
286            )
287            .await
288            .map_err(|e| {
289                warn!("Failed to upload artifacts: {:?}", e);
290            })
291    }
292}