1use crate::artifact::Artifact;
3use crate::client::{Client, JobArtifactFile, JobDependency, JobResponse, JobVariable};
4use bytes::{Bytes, BytesMut};
5use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use std::sync::{Arc, Mutex};
8use tracing::info;
9
10use crate::client::Error as ClientError;
11
12pub struct Variable<'a> {
20 v: &'a JobVariable,
21}
22
23impl<'a> Variable<'a> {
24 pub fn key(&self) -> &'a str {
26 &self.v.key
27 }
28
29 pub fn value(&self) -> &'a str {
31 &self.v.value
32 }
33
34 pub fn masked(&self) -> bool {
36 self.v.masked
37 }
38
39 pub fn public(&self) -> bool {
41 self.v.public
42 }
43}
44
45impl std::fmt::Display for Variable<'_> {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
47 if self.v.masked {
48 write!(f, "<MASKED>")
49 } else {
50 write!(f, "{}", self.v.value)
51 }
52 }
53}
54
55#[derive(Debug)]
61pub struct Dependency<'a> {
62 job: &'a Job,
63 dependency: &'a JobDependency,
64}
65
66impl Dependency<'_> {
67 pub fn id(&self) -> u64 {
71 self.dependency.id
72 }
73
74 pub fn name(&self) -> &str {
78 &self.dependency.name
79 }
80
81 pub fn artifact_filename(&self) -> Option<&str> {
83 self.dependency
84 .artifacts_file
85 .as_ref()
86 .map(|a| a.filename.as_str())
87 }
88
89 pub fn artifact_size(&self) -> Option<usize> {
91 self.dependency.artifacts_file.as_ref().map(|a| a.size)
92 }
93
94 async fn download_to_file(&self, _file: &JobArtifactFile) -> Result<(), ClientError> {
95 let mut path = self.job.build_dir.join("artifacts");
96 if let Err(e) = tokio::fs::create_dir(&path).await {
97 if e.kind() != std::io::ErrorKind::AlreadyExists {
98 return Err(ClientError::WriteFailure(e));
99 }
100 }
101
102 path.push(format!("{}.zip", self.id()));
104 let mut f = tokio::fs::File::create(&path)
105 .await
106 .map_err(ClientError::WriteFailure)?;
107 self.job
108 .client
109 .download_artifact(self.dependency.id, &self.dependency.token, &mut f)
110 .await?;
111 self.job.artifacts.insert_file(self.dependency.id, path);
112 Ok(())
113 }
114
115 async fn download_to_mem(&self, file: &JobArtifactFile) -> Result<(), ClientError> {
116 let mut bytes = Vec::with_capacity(file.size);
117 self.job
118 .client
119 .download_artifact(self.dependency.id, &self.dependency.token, &mut bytes)
120 .await?;
121 self.job.artifacts.insert_data(self.dependency.id, bytes);
122 Ok(())
123 }
124
125 pub async fn download(&self) -> Result<Option<Artifact>, ClientError> {
130 if let Some(file) = &self.dependency.artifacts_file {
131 let cached = self.job.artifacts.get(self.dependency.id).await?;
132 if cached.is_some() {
133 return Ok(cached);
134 }
135
136 if file.size > 64 * 1024 {
139 info!("Downloading dependency {} to file", self.dependency.id);
140 self.download_to_file(file).await?
141 } else {
142 info!("Downloading dependency {} to mem", self.dependency.id);
143 self.download_to_mem(file).await?
144 }
145 self.job.artifacts.get(self.dependency.id).await
146 } else {
147 Ok(None)
148 }
149 }
150}
151
152#[derive(Debug, Clone)]
153struct ArcU8(Arc<Vec<u8>>);
154
155impl ArcU8 {
156 fn new(data: Vec<u8>) -> Self {
157 Self(Arc::new(data))
158 }
159}
160
161impl AsRef<[u8]> for ArcU8 {
162 fn as_ref(&self) -> &[u8] {
163 self.0.as_ref().as_ref()
164 }
165}
166
167#[derive(Clone, Debug)]
168enum CacheData {
169 MemoryBacked(ArcU8),
170 FileBacked(PathBuf),
171}
172
173#[derive(Debug)]
174struct ArtifactCache {
175 data: Mutex<HashMap<u64, CacheData>>,
176}
177
178impl ArtifactCache {
179 fn new() -> Self {
180 ArtifactCache {
181 data: Mutex::new(HashMap::new()),
182 }
183 }
184
185 fn insert_file(&self, id: u64, path: PathBuf) {
186 let mut d = self.data.lock().unwrap();
187 d.insert(id, CacheData::FileBacked(path));
188 }
189
190 fn insert_data(&self, id: u64, data: Vec<u8>) {
191 let mut d = self.data.lock().unwrap();
192 d.insert(id, CacheData::MemoryBacked(ArcU8::new(data)));
193 }
194
195 fn lookup(&self, id: u64) -> Option<CacheData> {
196 let d = self.data.lock().unwrap();
197 d.get(&id).cloned()
198 }
199
200 async fn get(&self, id: u64) -> Result<Option<Artifact>, ClientError> {
201 if let Some(data) = self.lookup(id) {
202 match data {
203 CacheData::MemoryBacked(m) => {
204 Ok(Some(Artifact::new(Box::new(std::io::Cursor::new(m)))?))
205 }
206 CacheData::FileBacked(p) => {
207 let f = tokio::fs::File::open(p)
208 .await
209 .map_err(ClientError::WriteFailure)?;
210 Ok(Some(Artifact::new(Box::new(f.try_into_std().unwrap()))?))
212 }
213 }
214 } else {
215 Ok(None)
216 }
217 }
218}
219
220#[derive(Clone, Debug)]
221pub(crate) struct JobLog {
222 trace: Arc<Mutex<BytesMut>>,
223}
224
225impl JobLog {
226 pub(crate) fn new() -> Self {
227 let trace = Arc::new(Mutex::new(BytesMut::new()));
228 Self { trace }
229 }
230
231 pub(crate) fn trace(&self, data: &[u8]) {
232 let mut trace = self.trace.lock().unwrap();
233 trace.extend_from_slice(data);
234 }
235
236 pub(crate) fn split_trace(&self) -> Option<Bytes> {
237 let mut trace = self.trace.lock().unwrap();
238 if trace.is_empty() {
239 None
240 } else {
241 Some(trace.split().freeze())
242 }
243 }
244}
245
246#[derive(Debug)]
248pub struct Job {
249 response: Arc<JobResponse>,
250 client: Client,
251 log: JobLog,
252 build_dir: PathBuf,
253 artifacts: ArtifactCache,
254}
255
256impl Job {
257 pub(crate) fn new(
258 client: Client,
259 response: Arc<JobResponse>,
260 build_dir: PathBuf,
261 log: JobLog,
262 ) -> Self {
263 Self {
264 client,
265 response,
266 log,
267 build_dir,
268 artifacts: ArtifactCache::new(),
269 }
270 }
271
272 pub fn id(&self) -> u64 {
274 self.response.id
275 }
276
277 pub fn trace<D: AsRef<[u8]>>(&self, data: D) {
282 self.log.trace(data.as_ref());
283 }
284
285 pub fn variable(&self, key: &str) -> Option<Variable> {
287 self.response.variables.get(key).map(|v| Variable { v })
288 }
289
290 pub fn variables(&self) -> impl Iterator<Item = Variable> {
292 self.response.variables.values().map(|v| Variable { v })
293 }
294
295 pub fn dependencies(&self) -> impl Iterator<Item = Dependency> {
297 self.response
298 .dependencies
299 .iter()
300 .map(move |dependency| Dependency {
301 job: self,
302 dependency,
303 })
304 }
305
306 pub fn build_dir(&self) -> &Path {
308 &self.build_dir
309 }
310}