use crate::artifact::Artifact;
use crate::client::{Client, JobArtifactFile, JobDependency, JobResponse, JobVariable};
use crate::outputln;
use bytes::{Bytes, BytesMut};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use tokio::io::AsyncWrite;
use tokio_retry2::strategy::{FibonacciBackoff, jitter};
use tracing::info;
use crate::client::Error as ClientError;
pub struct Variable<'a> {
v: &'a JobVariable,
}
impl<'a> Variable<'a> {
pub fn key(&self) -> &'a str {
&self.v.key
}
pub fn value(&self) -> &'a str {
&self.v.value
}
pub fn masked(&self) -> bool {
self.v.masked
}
pub fn public(&self) -> bool {
self.v.public
}
}
impl std::fmt::Display for Variable<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
if self.v.masked {
write!(f, "<MASKED>")
} else {
write!(f, "{}", self.v.value)
}
}
}
#[derive(Debug)]
pub struct Dependency<'a> {
job: &'a Job,
dependency: &'a JobDependency,
}
fn is_retriable_error(err: &ClientError) -> bool {
match err {
ClientError::Request(_) => true,
ClientError::UnexpectedStatus(status) => status.is_server_error(),
_ => false,
}
}
impl Dependency<'_> {
pub fn id(&self) -> u64 {
self.dependency.id
}
pub fn name(&self) -> &str {
&self.dependency.name
}
pub fn artifact_filename(&self) -> Option<&str> {
self.dependency
.artifacts_file
.as_ref()
.map(|a| a.filename.as_str())
}
pub fn artifact_size(&self) -> Option<usize> {
self.dependency.artifacts_file.as_ref().map(|a| a.size)
}
async fn download_impl<A>(&self, mut writer: A) -> Result<(), ClientError>
where
A: AsyncWrite + Unpin,
{
let mut strategy = FibonacciBackoff::from_millis(900).map(jitter).take(4);
loop {
let r = self
.job
.client
.download_artifact(self.dependency.id, &self.dependency.token, &mut writer)
.await;
match (r, strategy.next()) {
(Err(err), Some(d)) if is_retriable_error(&err) => {
outputln!(
"Error getting artifacts from {}: {err}. Retrying",
self.dependency.name
);
tokio::time::sleep(d).await;
}
(r, _) => return r,
}
}
}
async fn download_to_file(&self, _file: &JobArtifactFile) -> Result<(), ClientError> {
let mut path = self.job.build_dir.join("artifacts");
if let Err(e) = tokio::fs::create_dir(&path).await
&& e.kind() != std::io::ErrorKind::AlreadyExists
{
return Err(ClientError::WriteFailure(e));
}
path.push(format!("{}.zip", self.id()));
let mut f = tokio::fs::File::create(&path)
.await
.map_err(ClientError::WriteFailure)?;
self.download_impl(&mut f).await?;
self.job.artifacts.insert_file(self.dependency.id, path);
Ok(())
}
async fn download_to_mem(&self, file: &JobArtifactFile) -> Result<(), ClientError> {
let mut bytes = Vec::with_capacity(file.size);
self.download_impl(&mut bytes).await?;
self.job.artifacts.insert_data(self.dependency.id, bytes);
Ok(())
}
pub async fn download(&self) -> Result<Option<Artifact>, ClientError> {
if let Some(file) = &self.dependency.artifacts_file {
let cached = self.job.artifacts.get(self.dependency.id).await?;
if cached.is_some() {
return Ok(cached);
}
if file.size > 64 * 1024 {
info!("Downloading dependency {} to file", self.dependency.id);
self.download_to_file(file).await?
} else {
info!("Downloading dependency {} to mem", self.dependency.id);
self.download_to_mem(file).await?
}
self.job.artifacts.get(self.dependency.id).await
} else {
Ok(None)
}
}
}
#[derive(Debug, Clone)]
struct ArcU8(Arc<Vec<u8>>);
impl ArcU8 {
fn new(data: Vec<u8>) -> Self {
Self(Arc::new(data))
}
}
impl AsRef<[u8]> for ArcU8 {
fn as_ref(&self) -> &[u8] {
self.0.as_ref().as_ref()
}
}
#[derive(Clone, Debug)]
enum CacheData {
MemoryBacked(ArcU8),
FileBacked(PathBuf),
}
#[derive(Debug)]
struct ArtifactCache {
data: Mutex<HashMap<u64, CacheData>>,
}
impl ArtifactCache {
fn new() -> Self {
ArtifactCache {
data: Mutex::new(HashMap::new()),
}
}
fn insert_file(&self, id: u64, path: PathBuf) {
let mut d = self.data.lock().unwrap();
d.insert(id, CacheData::FileBacked(path));
}
fn insert_data(&self, id: u64, data: Vec<u8>) {
let mut d = self.data.lock().unwrap();
d.insert(id, CacheData::MemoryBacked(ArcU8::new(data)));
}
fn lookup(&self, id: u64) -> Option<CacheData> {
let d = self.data.lock().unwrap();
d.get(&id).cloned()
}
async fn get(&self, id: u64) -> Result<Option<Artifact>, ClientError> {
if let Some(data) = self.lookup(id) {
match data {
CacheData::MemoryBacked(m) => {
Ok(Some(Artifact::new(Box::new(std::io::Cursor::new(m)))?))
}
CacheData::FileBacked(p) => {
let f = tokio::fs::File::open(p)
.await
.map_err(ClientError::WriteFailure)?;
Ok(Some(Artifact::new(Box::new(f.try_into_std().unwrap()))?))
}
}
} else {
Ok(None)
}
}
}
#[derive(Clone, Debug)]
pub(crate) struct JobLog {
trace: Arc<Mutex<BytesMut>>,
}
impl JobLog {
pub(crate) fn new() -> Self {
let trace = Arc::new(Mutex::new(BytesMut::new()));
Self { trace }
}
pub(crate) fn trace(&self, data: &[u8]) {
let mut trace = self.trace.lock().unwrap();
trace.extend_from_slice(data);
}
pub(crate) fn split_trace(&self) -> Option<Bytes> {
let mut trace = self.trace.lock().unwrap();
if trace.is_empty() {
None
} else {
Some(trace.split().freeze())
}
}
}
#[derive(Debug)]
pub struct Job {
response: Arc<JobResponse>,
client: Client,
log: JobLog,
build_dir: PathBuf,
artifacts: ArtifactCache,
}
impl Job {
pub(crate) fn new(
client: Client,
response: Arc<JobResponse>,
build_dir: PathBuf,
log: JobLog,
) -> Self {
Self {
client,
response,
log,
build_dir,
artifacts: ArtifactCache::new(),
}
}
pub fn id(&self) -> u64 {
self.response.id
}
pub fn trace<D: AsRef<[u8]>>(&self, data: D) {
self.log.trace(data.as_ref());
}
pub fn variable(&self, key: &str) -> Option<Variable<'_>> {
self.response.variables.get(key).map(|v| Variable { v })
}
pub fn variables(&self) -> impl Iterator<Item = Variable<'_>> {
self.response.variables.values().map(|v| Variable { v })
}
pub fn dependencies(&self) -> impl Iterator<Item = Dependency<'_>> {
self.response
.dependencies
.iter()
.map(move |dependency| Dependency {
job: self,
dependency,
})
}
pub fn build_dir(&self) -> &Path {
&self.build_dir
}
}