use std::{
collections::{BTreeSet, HashMap},
str::FromStr,
time::Duration,
};
use url::Url;
use uuid::Uuid;
use radicle::{
git::Oid,
node::{
Handle, Node,
sync::{Announcer, AnnouncerConfig, ReplicationFactor},
},
prelude::{Profile, ReadStorage, RepoId},
storage::git::Repository,
};
use radicle_job::*;
use crate::{logger, msg::RunId};
pub struct KnownJobCobs {
profile: Profile,
known: HashMap<(RepoId, Oid), JobId>,
}
impl KnownJobCobs {
pub fn new() -> Result<Self, JobError> {
let profile = Profile::load().map_err(JobError::Profile)?;
Ok(Self {
profile,
known: HashMap::new(),
})
}
pub fn create_job(&mut self, repo_id: RepoId, oid: Oid) -> Option<JobId> {
let key = (repo_id, oid);
if let Some(job_id) = self.known.get(&key) {
Some(*job_id)
} else if let Ok(job_id) = self.fallible_create_job(repo_id, oid) {
self.known.insert(key, job_id);
Some(job_id)
} else {
None
}
}
fn fallible_create_job(&self, repo_id: RepoId, oid: Oid) -> Result<JobId, JobError> {
let repo = repository(&self.profile, repo_id)?;
let signer = self.profile.signer().map_err(JobError::Signer)?;
let mut jobs = jobs(&repo)?;
match job_for_commit(&jobs, oid) {
Err(JobError::NoJob(_)) => {
let job = jobs.create(oid, &signer).map_err(JobError::CreateJob)?;
announce(&self.profile, repo_id, false)?;
logger::job_create(&repo_id, &oid, job.id());
Ok(*job.id())
}
Err(err) => {
logger::job_failure(
"failed to find job COB for Git object",
&repo_id,
&oid,
Some(&err),
);
Err(err)
}
Ok(job_id) => {
logger::job_reuse(&repo_id, &oid, &job_id);
Ok(job_id)
}
}
}
pub fn create_run(
&mut self,
repo_id: RepoId,
oid: Oid,
run_id: RunId,
url: &Url,
announce: bool,
) {
if let Some(job_id) = self.create_job(repo_id, oid) {
if let Err(err) = self.fallible_create_run(job_id, repo_id, run_id, url, announce) {
logger::job_failure(
"failed to add a run to a job COB",
&repo_id,
&oid,
Some(&err),
);
}
}
}
fn fallible_create_run(
&self,
job_id: JobId,
repo_id: RepoId,
run_id: RunId,
url: &Url,
a: bool,
) -> Result<(), JobError> {
let uuid = Uuid::from_str(run_id.as_str()).map_err(JobError::Uuid)?;
let repo = repository(&self.profile, repo_id)?;
let signer = self.profile.signer().map_err(JobError::Signer)?;
let mut jobs = jobs(&repo)?;
let mut job = jobs.get_mut(&job_id).map_err(JobError::GetJobMut)?;
job.run(uuid, url.clone(), &signer)
.map_err(JobError::AddRun)?;
announce(&self.profile, repo_id, a)?;
logger::job_run_create(job_id, uuid);
Ok(())
}
}
pub fn succeeded(repo_id: RepoId, oid: Oid, run_id: RunId) {
if let Err(err) = finish(repo_id, oid, run_id, Reason::Succeeded) {
logger::job_failure(
"failed to mark a run as succeeded",
&repo_id,
&oid,
Some(&err),
);
}
}
pub fn failed(repo_id: RepoId, oid: Oid, run_id: RunId) {
if let Err(err) = finish(repo_id, oid, run_id, Reason::Failed) {
logger::job_failure("failed to mark a run as failed", &repo_id, &oid, Some(&err));
}
}
fn finish(repo_id: RepoId, oid: Oid, run_id: RunId, reason: Reason) -> Result<(), JobError> {
let uuid = Uuid::from_str(run_id.as_str()).map_err(JobError::Uuid)?;
let profile = profile()?;
let repo = repository(&profile, repo_id)?;
let signer = profile.signer().map_err(JobError::Signer)?;
let mut jobs = jobs(&repo)?;
let job_id = job_for_commit(&jobs, oid)?;
let mut job = jobs.get_mut(&job_id).map_err(JobError::GetJobMut)?;
job.finish(uuid, reason, &signer)
.map_err(JobError::Finish)?;
announce(&profile, repo_id, true)?;
logger::job_run_finished(job_id, uuid, reason);
Ok(())
}
fn profile() -> Result<Profile, JobError> {
Profile::load().map_err(JobError::Profile)
}
fn repository(profile: &Profile, repo_id: RepoId) -> Result<Repository, JobError> {
profile
.storage
.repository(repo_id)
.map_err(JobError::OpenRepository)
}
fn jobs<'a>(repo: &'a Repository) -> Result<Jobs<'a, Repository>, JobError> {
Jobs::open(repo).map_err(JobError::Jobs)
}
fn job_for_commit<'a>(jobs: &Jobs<'a, Repository>, wanted: Oid) -> Result<JobId, JobError> {
for item in jobs.all().map_err(JobError::AllJobs)? {
let (job_id, job) = item.map_err(JobError::AllJobsJob)?;
let job_id = JobId::from(job_id);
if job.oid() == &wanted {
return Ok(job_id);
}
}
Err(JobError::NoJob(wanted))
}
fn announce(profile: &Profile, repo_id: RepoId, announce: bool) -> Result<(), JobError> {
if announce {
const TIMEOUT: Duration = Duration::from_millis(5000);
let mut node = Node::new(profile.home.socket());
let (synced, unsynced) = node.seeds(repo_id).map_err(JobError::Seeds)?.iter().fold(
(BTreeSet::new(), BTreeSet::new()),
|(mut synced, mut unsynced), seed| {
if seed.is_synced() {
synced.insert(seed.nid);
} else {
unsynced.insert(seed.nid);
}
(synced, unsynced)
},
);
let announcer = Announcer::new(AnnouncerConfig::public(
*profile.id(),
ReplicationFactor::MustReach(1),
BTreeSet::new(),
synced,
unsynced,
))
.map_err(|_| JobError::Announcer)?;
node.announce(repo_id, TIMEOUT, announcer, |_, _| ())
.map_err(JobError::Announce)?;
}
Ok(())
}
#[derive(Debug, thiserror::Error)]
pub enum JobError {
#[error("failed to load Radicle profile")]
Profile(#[source] radicle::profile::Error),
#[error("failed to open repository in Radicle node storage")]
OpenRepository(#[source] radicle::storage::RepositoryError),
#[error("failed to list job COBs in repository")]
Jobs(#[source] radicle::storage::RepositoryError),
#[error("failed to get all job COBs")]
AllJobs(#[source] radicle::cob::store::Error),
#[error("failed to create a new job COB")]
CreateJob(#[source] radicle::cob::store::Error),
#[error("failed to create a signer for Radicle repository")]
Signer(#[source] radicle::profile::Error),
#[error("couldn't get job when iterating")]
AllJobsJob(#[source] radicle::cob::store::Error),
#[error("failed to get mutable job COB")]
GetJobMut(#[source] radicle::cob::store::Error),
#[error("failed to add a run to a job COB")]
AddRun(#[source] radicle::cob::store::Error),
#[error("could not mark a run as finished")]
Finish(#[source] radicle::cob::store::Error),
#[error("failed to construct a UUID from a run id")]
Uuid(#[source] uuid::Error),
#[error("failed to find job COB for oid {0}")]
NoJob(Oid),
#[error("a job for Git object {0} already exists")]
JobExists(Oid),
#[error("failed to get seeds for node")]
Seeds(#[source] radicle::node::Error),
#[error("failed to announce COB change")]
Announce(#[source] radicle::node::Error),
#[error("failed to create a COB announcer")]
Announcer,
}