1use std::fs::File;
3use std::future::Future;
4use std::io::{Seek, Write};
5use std::path::Path;
6use std::pin::Pin;
7use std::task::Poll;
8use std::thread;
9
10use flate2::{Compression, GzBuilder};
11use futures::{future::BoxFuture, AsyncWrite, FutureExt};
12use reqwest::Body;
13use tokio::fs::File as AsyncFile;
14use tokio::sync::mpsc::{self, error::SendError};
15use tokio::sync::oneshot;
16use tokio_util::io::ReaderStream;
17use tracing::{error, warn};
18
19use crate::{
20 client::{ArtifactFormat, ArtifactInfo, Client, JobArtifact},
21 JobResult,
22};
23
24const DEFAULT_ARTIFACT_NAME: &str = "default";
25
26#[derive(Debug)]
27enum UploadRequest {
28 NewFile(String, oneshot::Sender<std::io::Result<()>>),
29 WriteData(Vec<u8>, oneshot::Sender<std::io::Result<()>>),
30 Finish(oneshot::Sender<std::io::Result<File>>),
31}
32
33enum UploadFileState<'a> {
34 Idle,
35 Writing(
36 Option<BoxFuture<'a, Result<(), SendError<UploadRequest>>>>,
37 oneshot::Receiver<std::io::Result<()>>,
38 ),
39}
40
41fn zip_thread(temp: File, mut rx: mpsc::Receiver<UploadRequest>) {
42 let mut zip = zip::ZipWriter::new(temp);
43 let options =
44 zip::write::SimpleFileOptions::default().compression_method(zip::CompressionMethod::Stored);
45
46 loop {
47 if let Some(request) = rx.blocking_recv() {
48 match request {
49 UploadRequest::NewFile(s, tx) => {
50 let r = zip.start_file(s, options);
51 tx.send(r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)))
52 .expect("Couldn't send reply");
53 }
54 UploadRequest::WriteData(v, tx) => {
55 let r = zip.write(&v);
56 tx.send(r.and(Ok(()))).expect("Couldn't send reply");
57 }
58 UploadRequest::Finish(tx) => {
59 let r = zip.finish();
60 let reply = match r {
61 Ok(mut file) => file.rewind().map(|()| file),
62 Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
63 };
64 tx.send(reply).expect("Couldn't send finished zip");
65 return;
66 }
67 }
68 } else {
69 return;
70 }
71 }
72}
73
74fn gzip_thread(mut temp: File, mut rx: mpsc::Receiver<UploadRequest>) {
75 let mut gz = match rx.blocking_recv() {
76 Some(UploadRequest::NewFile(s, tx)) => {
77 tx.send(Ok(())).expect("Couldn't send reply");
78 GzBuilder::new()
79 .filename(s)
80 .write(&mut temp, Compression::default())
81 }
82 Some(UploadRequest::WriteData(_, tx)) => {
83 tx.send(Err(std::io::Error::new(
84 std::io::ErrorKind::Other,
85 "no file open",
86 )))
87 .expect("Couldn't send reply");
88 return;
89 }
90 Some(UploadRequest::Finish(tx)) => {
91 tx.send(Err(std::io::Error::new(
92 std::io::ErrorKind::Other,
93 "no file open",
94 )))
95 .expect("Couldn't send reply");
96 return;
97 }
98 None => {
99 return;
100 }
101 };
102
103 loop {
104 if let Some(request) = rx.blocking_recv() {
105 match request {
106 UploadRequest::NewFile(_, tx) => {
107 tx.send(Err(std::io::Error::new(
108 std::io::ErrorKind::Other,
109 "multiple files not permitted in gzip",
110 )))
111 .expect("Couldn't send reply");
112 return;
113 }
114 UploadRequest::WriteData(v, tx) => {
115 let r = gz.write(&v);
116 tx.send(r.and(Ok(()))).expect("Couldn't send reply");
117 }
118 UploadRequest::Finish(tx) => {
119 let r = gz.finish();
120 let reply = match r {
121 Ok(_) => temp.rewind().map(|()| temp),
122 Err(e) => Err(std::io::Error::new(std::io::ErrorKind::Other, e)),
123 };
124 tx.send(reply).expect("Couldn't send finished gzip");
125 return;
126 }
127 }
128 } else {
129 return;
130 }
131 }
132}
133
134pub struct UploadFile<'a> {
136 tx: &'a mpsc::Sender<UploadRequest>,
137 state: UploadFileState<'a>,
138}
139
140impl AsyncWrite for UploadFile<'_> {
141 fn poll_write(
142 self: std::pin::Pin<&mut Self>,
143 cx: &mut std::task::Context<'_>,
144 buf: &[u8],
145 ) -> std::task::Poll<std::io::Result<usize>> {
146 let this = self.get_mut();
147 loop {
148 match this.state {
149 UploadFileState::Idle => {
150 let (tx, rx) = oneshot::channel();
151 let send = this
152 .tx
153 .send(UploadRequest::WriteData(Vec::from(buf), tx))
154 .boxed();
155 this.state = UploadFileState::Writing(Some(send), rx)
156 }
157 UploadFileState::Writing(ref mut send, ref mut rx) => {
158 if let Some(f) = send {
159 let _r = futures::ready!(f.as_mut().poll(cx));
161 *send = None;
162 } else {
163 let _r = futures::ready!(Pin::new(rx).poll(cx));
164 this.state = UploadFileState::Idle;
165 return Poll::Ready(Ok(buf.len()));
166 }
167 }
168 }
169 }
170 }
171
172 fn poll_flush(
173 self: std::pin::Pin<&mut Self>,
174 _cx: &mut std::task::Context<'_>,
175 ) -> std::task::Poll<std::io::Result<()>> {
176 Poll::Ready(Ok(()))
177 }
178
179 fn poll_close(
180 self: std::pin::Pin<&mut Self>,
181 _cx: &mut std::task::Context<'_>,
182 ) -> std::task::Poll<std::io::Result<()>> {
183 Poll::Ready(Ok(()))
184 }
185}
186
187fn make_artifact_name(base: Option<&str>, format: &ArtifactFormat) -> String {
188 let name = base.unwrap_or(DEFAULT_ARTIFACT_NAME);
189 match format {
190 ArtifactFormat::Zip => format!("{}.zip", name),
191 ArtifactFormat::Gzip => format!("{}.gz", name),
192 ArtifactFormat::Raw => unimplemented!("Raw artifacts are not supported."),
193 }
194}
195
196pub struct Uploader<'a> {
198 client: Client,
199 job_id: u64,
200 job_token: String,
201 artifact: &'a JobArtifact,
202 tx: mpsc::Sender<UploadRequest>,
203}
204
205impl<'a> Uploader<'a> {
206 pub(crate) fn new(
207 client: Client,
208 build_dir: &Path,
209 job_id: u64,
210 job_token: String,
211 artifact: &'a JobArtifact,
212 ) -> Result<Self, ()> {
213 let temp = tempfile::tempfile_in(build_dir)
214 .map_err(|e| warn!("Failed to create artifacts temp file: {:?}", e))?;
215
216 let (tx, rx) = mpsc::channel(2);
217 match artifact.artifact_format {
218 ArtifactFormat::Zip => {
219 thread::spawn(move || zip_thread(temp, rx));
220 }
221 ArtifactFormat::Gzip => {
222 thread::spawn(move || gzip_thread(temp, rx));
223 }
224 ArtifactFormat::Raw => {
225 error!("Raw artifacts are currently unsupported.");
226 return Err(());
227 }
228 }
229 Ok(Self {
230 client,
231 job_id,
232 job_token,
233 artifact,
234 tx,
235 })
236 }
237
238 pub(crate) async fn file(&mut self, name: String) -> Result<UploadFile<'_>, ()> {
240 let (tx, rx) = oneshot::channel();
241 self.tx
242 .send(UploadRequest::NewFile(name, tx))
243 .await
244 .expect("Failed to create file");
245 match rx.await {
246 Ok(Ok(())) => Ok(()),
247 Ok(Err(err)) => {
248 warn!("Failed to create compressed artifact file: {:?}", err);
249 Err(())
250 }
251 Err(_) => {
252 warn!("Failed to compress artifacts: thread died");
253 Err(())
254 }
255 }?;
256
257 Ok(UploadFile {
258 tx: &self.tx,
259 state: UploadFileState::Idle,
260 })
261 }
262
263 pub(crate) async fn upload(self) -> JobResult {
264 let (tx, rx) = oneshot::channel();
265 self.tx.send(UploadRequest::Finish(tx)).await.unwrap();
266 let file = AsyncFile::from_std(match rx.await {
267 Ok(Ok(file)) => Ok(file),
268 Ok(Err(err)) => {
269 warn!("Failed to compress artifacts: {:?}", err);
270 Err(())
271 }
272 Err(_) => {
273 warn!("Failed to compress artifacts: thread died");
274 Err(())
275 }
276 }?);
277
278 let reader = ReaderStream::new(file);
279 self.client
280 .upload_artifact(
281 self.job_id,
282 &self.job_token,
283 ArtifactInfo {
284 name: &make_artifact_name(
285 self.artifact.name.as_deref(),
286 &self.artifact.artifact_format,
287 ),
288 artifact_format: &self.artifact.artifact_format.to_string(),
289 artifact_type: &self.artifact.artifact_type,
290 expire_in: self.artifact.expire_in.as_deref(),
291 },
292 Body::wrap_stream(reader),
293 )
294 .await
295 .map_err(|e| {
296 warn!("Failed to upload artifacts: {:?}", e);
297 })
298 }
299}