1use std::fs::File;
3use std::future::Future;
4use std::io::{Seek, Write};
5use std::path::Path;
6use std::pin::Pin;
7use std::task::Poll;
8use std::thread;
9
10use flate2::{Compression, GzBuilder};
11use futures::{AsyncWrite, FutureExt, future::BoxFuture};
12use reqwest::Body;
13use tokio::fs::File as AsyncFile;
14use tokio::sync::mpsc::{self, error::SendError};
15use tokio::sync::oneshot;
16use tokio_util::io::ReaderStream;
17use tracing::{error, warn};
18
19use crate::{
20 JobResult,
21 client::{ArtifactFormat, ArtifactInfo, Client, JobArtifact},
22};
23
24const DEFAULT_ARTIFACT_NAME: &str = "default";
25
26#[derive(Debug)]
27enum UploadRequest {
28 NewFile(String, oneshot::Sender<std::io::Result<()>>),
29 WriteData(Vec<u8>, oneshot::Sender<std::io::Result<()>>),
30 Finish(oneshot::Sender<std::io::Result<File>>),
31}
32
33enum UploadFileState<'a> {
34 Idle,
35 Writing(
36 Option<BoxFuture<'a, Result<(), SendError<UploadRequest>>>>,
37 oneshot::Receiver<std::io::Result<()>>,
38 ),
39}
40
41fn zip_thread(temp: File, mut rx: mpsc::Receiver<UploadRequest>) {
42 let mut zip = zip::ZipWriter::new(temp);
43 let options =
44 zip::write::SimpleFileOptions::default().compression_method(zip::CompressionMethod::Stored);
45
46 loop {
47 if let Some(request) = rx.blocking_recv() {
48 match request {
49 UploadRequest::NewFile(s, tx) => {
50 let r = zip.start_file(s, options);
51 tx.send(r.map_err(std::io::Error::other))
52 .expect("Couldn't send reply");
53 }
54 UploadRequest::WriteData(v, tx) => {
55 let r = zip.write(&v);
56 tx.send(r.and(Ok(()))).expect("Couldn't send reply");
57 }
58 UploadRequest::Finish(tx) => {
59 let r = zip.finish();
60 let reply = match r {
61 Ok(mut file) => file.rewind().map(|()| file),
62 Err(e) => Err(std::io::Error::other(e)),
63 };
64 tx.send(reply).expect("Couldn't send finished zip");
65 return;
66 }
67 }
68 } else {
69 return;
70 }
71 }
72}
73
74fn gzip_thread(mut temp: File, mut rx: mpsc::Receiver<UploadRequest>) {
75 let mut gz = match rx.blocking_recv() {
76 Some(UploadRequest::NewFile(s, tx)) => {
77 tx.send(Ok(())).expect("Couldn't send reply");
78 GzBuilder::new()
79 .filename(s)
80 .write(&mut temp, Compression::default())
81 }
82 Some(UploadRequest::WriteData(_, tx)) => {
83 tx.send(Err(std::io::Error::other("no file open")))
84 .expect("Couldn't send reply");
85 return;
86 }
87 Some(UploadRequest::Finish(tx)) => {
88 tx.send(Err(std::io::Error::other("no file open")))
89 .expect("Couldn't send reply");
90 return;
91 }
92 None => {
93 return;
94 }
95 };
96
97 loop {
98 if let Some(request) = rx.blocking_recv() {
99 match request {
100 UploadRequest::NewFile(_, tx) => {
101 tx.send(Err(std::io::Error::other(
102 "multiple files not permitted in gzip",
103 )))
104 .expect("Couldn't send reply");
105 return;
106 }
107 UploadRequest::WriteData(v, tx) => {
108 let r = gz.write(&v);
109 tx.send(r.and(Ok(()))).expect("Couldn't send reply");
110 }
111 UploadRequest::Finish(tx) => {
112 let r = gz.finish();
113 let reply = match r {
114 Ok(_) => temp.rewind().map(|()| temp),
115 Err(e) => Err(std::io::Error::other(e)),
116 };
117 tx.send(reply).expect("Couldn't send finished gzip");
118 return;
119 }
120 }
121 } else {
122 return;
123 }
124 }
125}
126
127pub struct UploadFile<'a> {
129 tx: &'a mpsc::Sender<UploadRequest>,
130 state: UploadFileState<'a>,
131}
132
133impl AsyncWrite for UploadFile<'_> {
134 fn poll_write(
135 self: std::pin::Pin<&mut Self>,
136 cx: &mut std::task::Context<'_>,
137 buf: &[u8],
138 ) -> std::task::Poll<std::io::Result<usize>> {
139 let this = self.get_mut();
140 loop {
141 match this.state {
142 UploadFileState::Idle => {
143 let (tx, rx) = oneshot::channel();
144 let send = this
145 .tx
146 .send(UploadRequest::WriteData(Vec::from(buf), tx))
147 .boxed();
148 this.state = UploadFileState::Writing(Some(send), rx)
149 }
150 UploadFileState::Writing(ref mut send, ref mut rx) => {
151 if let Some(f) = send {
152 let _r = futures::ready!(f.as_mut().poll(cx));
154 *send = None;
155 } else {
156 let _r = futures::ready!(Pin::new(rx).poll(cx));
157 this.state = UploadFileState::Idle;
158 return Poll::Ready(Ok(buf.len()));
159 }
160 }
161 }
162 }
163 }
164
165 fn poll_flush(
166 self: std::pin::Pin<&mut Self>,
167 _cx: &mut std::task::Context<'_>,
168 ) -> std::task::Poll<std::io::Result<()>> {
169 Poll::Ready(Ok(()))
170 }
171
172 fn poll_close(
173 self: std::pin::Pin<&mut Self>,
174 _cx: &mut std::task::Context<'_>,
175 ) -> std::task::Poll<std::io::Result<()>> {
176 Poll::Ready(Ok(()))
177 }
178}
179
180fn make_artifact_name(base: Option<&str>, format: &ArtifactFormat) -> String {
181 let name = base.unwrap_or(DEFAULT_ARTIFACT_NAME);
182 match format {
183 ArtifactFormat::Zip => format!("{name}.zip"),
184 ArtifactFormat::Gzip => format!("{name}.gz"),
185 ArtifactFormat::Raw => unimplemented!("Raw artifacts are not supported."),
186 }
187}
188
189pub struct Uploader<'a> {
191 client: Client,
192 job_id: u64,
193 job_token: String,
194 artifact: &'a JobArtifact,
195 tx: mpsc::Sender<UploadRequest>,
196}
197
198impl<'a> Uploader<'a> {
199 pub(crate) fn new(
200 client: Client,
201 build_dir: &Path,
202 job_id: u64,
203 job_token: String,
204 artifact: &'a JobArtifact,
205 ) -> Result<Self, ()> {
206 let temp = tempfile::tempfile_in(build_dir)
207 .map_err(|e| warn!("Failed to create artifacts temp file: {:?}", e))?;
208
209 let (tx, rx) = mpsc::channel(2);
210 match artifact.artifact_format {
211 ArtifactFormat::Zip => {
212 thread::spawn(move || zip_thread(temp, rx));
213 }
214 ArtifactFormat::Gzip => {
215 thread::spawn(move || gzip_thread(temp, rx));
216 }
217 ArtifactFormat::Raw => {
218 error!("Raw artifacts are currently unsupported.");
219 return Err(());
220 }
221 }
222 Ok(Self {
223 client,
224 job_id,
225 job_token,
226 artifact,
227 tx,
228 })
229 }
230
231 pub(crate) async fn file(&mut self, name: String) -> Result<UploadFile<'_>, ()> {
233 let (tx, rx) = oneshot::channel();
234 self.tx
235 .send(UploadRequest::NewFile(name, tx))
236 .await
237 .expect("Failed to create file");
238 match rx.await {
239 Ok(Ok(())) => Ok(()),
240 Ok(Err(err)) => {
241 warn!("Failed to create compressed artifact file: {:?}", err);
242 Err(())
243 }
244 Err(_) => {
245 warn!("Failed to compress artifacts: thread died");
246 Err(())
247 }
248 }?;
249
250 Ok(UploadFile {
251 tx: &self.tx,
252 state: UploadFileState::Idle,
253 })
254 }
255
256 pub(crate) async fn upload(self) -> JobResult {
257 let (tx, rx) = oneshot::channel();
258 self.tx.send(UploadRequest::Finish(tx)).await.unwrap();
259 let file = AsyncFile::from_std(match rx.await {
260 Ok(Ok(file)) => Ok(file),
261 Ok(Err(err)) => {
262 warn!("Failed to compress artifacts: {:?}", err);
263 Err(())
264 }
265 Err(_) => {
266 warn!("Failed to compress artifacts: thread died");
267 Err(())
268 }
269 }?);
270
271 let reader = ReaderStream::new(file);
272 self.client
273 .upload_artifact(
274 self.job_id,
275 &self.job_token,
276 ArtifactInfo {
277 name: &make_artifact_name(
278 self.artifact.name.as_deref(),
279 &self.artifact.artifact_format,
280 ),
281 artifact_format: &self.artifact.artifact_format.to_string(),
282 artifact_type: &self.artifact.artifact_type,
283 expire_in: self.artifact.expire_in.as_deref(),
284 },
285 Body::wrap_stream(reader),
286 )
287 .await
288 .map_err(|e| {
289 warn!("Failed to upload artifacts: {:?}", e);
290 })
291 }
292}