1use crate::artifact::Artifact;
3use crate::client::{Client, JobArtifactFile, JobDependency, JobResponse, JobVariable};
4use crate::outputln;
5use bytes::{Bytes, BytesMut};
6use std::collections::HashMap;
7use std::path::{Path, PathBuf};
8use std::sync::{Arc, Mutex};
9use tokio::io::AsyncWrite;
10use tokio_retry2::strategy::{FibonacciBackoff, jitter};
11use tracing::info;
12
13use crate::client::Error as ClientError;
14
15pub struct Variable<'a> {
23 v: &'a JobVariable,
24}
25
26impl<'a> Variable<'a> {
27 pub fn key(&self) -> &'a str {
29 &self.v.key
30 }
31
32 pub fn value(&self) -> &'a str {
34 &self.v.value
35 }
36
37 pub fn masked(&self) -> bool {
39 self.v.masked
40 }
41
42 pub fn public(&self) -> bool {
44 self.v.public
45 }
46}
47
48impl std::fmt::Display for Variable<'_> {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
50 if self.v.masked {
51 write!(f, "<MASKED>")
52 } else {
53 write!(f, "{}", self.v.value)
54 }
55 }
56}
57
58#[derive(Debug)]
64pub struct Dependency<'a> {
65 job: &'a Job,
66 dependency: &'a JobDependency,
67}
68
69fn is_retriable_error(err: &ClientError) -> bool {
70 match err {
71 ClientError::Request(_) => true,
72 ClientError::UnexpectedStatus(status) => status.is_server_error(),
73 _ => false,
74 }
75}
76
77impl Dependency<'_> {
78 pub fn id(&self) -> u64 {
82 self.dependency.id
83 }
84
85 pub fn name(&self) -> &str {
89 &self.dependency.name
90 }
91
92 pub fn artifact_filename(&self) -> Option<&str> {
94 self.dependency
95 .artifacts_file
96 .as_ref()
97 .map(|a| a.filename.as_str())
98 }
99
100 pub fn artifact_size(&self) -> Option<usize> {
102 self.dependency.artifacts_file.as_ref().map(|a| a.size)
103 }
104
105 async fn download_impl<A>(&self, mut writer: A) -> Result<(), ClientError>
106 where
107 A: AsyncWrite + Unpin,
108 {
109 let mut strategy = FibonacciBackoff::from_millis(900).map(jitter).take(4);
110 loop {
111 let r = self
112 .job
113 .client
114 .download_artifact(self.dependency.id, &self.dependency.token, &mut writer)
115 .await;
116
117 match (r, strategy.next()) {
118 (Err(err), Some(d)) if is_retriable_error(&err) => {
119 outputln!(
120 "Error getting artifacts from {}: {err}. Retrying",
121 self.dependency.name
122 );
123 tokio::time::sleep(d).await;
124 }
125 (r, _) => return r,
126 }
127 }
128 }
129
130 async fn download_to_file(&self, _file: &JobArtifactFile) -> Result<(), ClientError> {
131 let mut path = self.job.build_dir.join("artifacts");
132 if let Err(e) = tokio::fs::create_dir(&path).await
133 && e.kind() != std::io::ErrorKind::AlreadyExists
134 {
135 return Err(ClientError::WriteFailure(e));
136 }
137
138 path.push(format!("{}.zip", self.id()));
140 let mut f = tokio::fs::File::create(&path)
141 .await
142 .map_err(ClientError::WriteFailure)?;
143 self.download_impl(&mut f).await?;
144 self.job.artifacts.insert_file(self.dependency.id, path);
145 Ok(())
146 }
147
148 async fn download_to_mem(&self, file: &JobArtifactFile) -> Result<(), ClientError> {
149 let mut bytes = Vec::with_capacity(file.size);
150 self.download_impl(&mut bytes).await?;
151 self.job.artifacts.insert_data(self.dependency.id, bytes);
152 Ok(())
153 }
154
155 pub async fn download(&self) -> Result<Option<Artifact>, ClientError> {
160 if let Some(file) = &self.dependency.artifacts_file {
161 let cached = self.job.artifacts.get(self.dependency.id).await?;
162 if cached.is_some() {
163 return Ok(cached);
164 }
165
166 if file.size > 64 * 1024 {
169 info!("Downloading dependency {} to file", self.dependency.id);
170 self.download_to_file(file).await?
171 } else {
172 info!("Downloading dependency {} to mem", self.dependency.id);
173 self.download_to_mem(file).await?
174 }
175 self.job.artifacts.get(self.dependency.id).await
176 } else {
177 Ok(None)
178 }
179 }
180}
181
182#[derive(Debug, Clone)]
183struct ArcU8(Arc<Vec<u8>>);
184
185impl ArcU8 {
186 fn new(data: Vec<u8>) -> Self {
187 Self(Arc::new(data))
188 }
189}
190
191impl AsRef<[u8]> for ArcU8 {
192 fn as_ref(&self) -> &[u8] {
193 self.0.as_ref().as_ref()
194 }
195}
196
197#[derive(Clone, Debug)]
198enum CacheData {
199 MemoryBacked(ArcU8),
200 FileBacked(PathBuf),
201}
202
203#[derive(Debug)]
204struct ArtifactCache {
205 data: Mutex<HashMap<u64, CacheData>>,
206}
207
208impl ArtifactCache {
209 fn new() -> Self {
210 ArtifactCache {
211 data: Mutex::new(HashMap::new()),
212 }
213 }
214
215 fn insert_file(&self, id: u64, path: PathBuf) {
216 let mut d = self.data.lock().unwrap();
217 d.insert(id, CacheData::FileBacked(path));
218 }
219
220 fn insert_data(&self, id: u64, data: Vec<u8>) {
221 let mut d = self.data.lock().unwrap();
222 d.insert(id, CacheData::MemoryBacked(ArcU8::new(data)));
223 }
224
225 fn lookup(&self, id: u64) -> Option<CacheData> {
226 let d = self.data.lock().unwrap();
227 d.get(&id).cloned()
228 }
229
230 async fn get(&self, id: u64) -> Result<Option<Artifact>, ClientError> {
231 if let Some(data) = self.lookup(id) {
232 match data {
233 CacheData::MemoryBacked(m) => {
234 Ok(Some(Artifact::new(Box::new(std::io::Cursor::new(m)))?))
235 }
236 CacheData::FileBacked(p) => {
237 let f = tokio::fs::File::open(p)
238 .await
239 .map_err(ClientError::WriteFailure)?;
240 Ok(Some(Artifact::new(Box::new(f.try_into_std().unwrap()))?))
242 }
243 }
244 } else {
245 Ok(None)
246 }
247 }
248}
249
250#[derive(Clone, Debug)]
251pub(crate) struct JobLog {
252 trace: Arc<Mutex<BytesMut>>,
253}
254
255impl JobLog {
256 pub(crate) fn new() -> Self {
257 let trace = Arc::new(Mutex::new(BytesMut::new()));
258 Self { trace }
259 }
260
261 pub(crate) fn trace(&self, data: &[u8]) {
262 let mut trace = self.trace.lock().unwrap();
263 trace.extend_from_slice(data);
264 }
265
266 pub(crate) fn split_trace(&self) -> Option<Bytes> {
267 let mut trace = self.trace.lock().unwrap();
268 if trace.is_empty() {
269 None
270 } else {
271 Some(trace.split().freeze())
272 }
273 }
274}
275
276#[derive(Debug)]
278pub struct Job {
279 response: Arc<JobResponse>,
280 client: Client,
281 log: JobLog,
282 build_dir: PathBuf,
283 artifacts: ArtifactCache,
284}
285
286impl Job {
287 pub(crate) fn new(
288 client: Client,
289 response: Arc<JobResponse>,
290 build_dir: PathBuf,
291 log: JobLog,
292 ) -> Self {
293 Self {
294 client,
295 response,
296 log,
297 build_dir,
298 artifacts: ArtifactCache::new(),
299 }
300 }
301
302 pub fn id(&self) -> u64 {
304 self.response.id
305 }
306
307 pub fn trace<D: AsRef<[u8]>>(&self, data: D) {
312 self.log.trace(data.as_ref());
313 }
314
315 pub fn variable(&self, key: &str) -> Option<Variable<'_>> {
317 self.response.variables.get(key).map(|v| Variable { v })
318 }
319
320 pub fn variables(&self) -> impl Iterator<Item = Variable<'_>> {
322 self.response.variables.values().map(|v| Variable { v })
323 }
324
325 pub fn dependencies(&self) -> impl Iterator<Item = Dependency<'_>> {
327 self.response
328 .dependencies
329 .iter()
330 .map(move |dependency| Dependency {
331 job: self,
332 dependency,
333 })
334 }
335
336 pub fn build_dir(&self) -> &Path {
338 &self.build_dir
339 }
340}