use super::{Edge, Graph, State};
use crate::config::Configuration;
use crate::execution::commander::Commander;
use crate::execution::BoxError;
use crate::job::{Code, Job, JobId, Relation, Rule};
use core::iter::once;
use daggy::{NodeIndex, WouldCycle};
use log::{info, trace};
use petgraph::Direction;
use serde::Deserialize;
use serde_content::Value;
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use url::Url;
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum Error {
#[error("Duplicate job detected: {id} at {at:?}, already added in {old:?}.")]
DuplicateJob {
id: JobId,
at: Box<str>,
old: Box<str>,
},
#[error("Graph cycle detected: {0}.")]
CycleDetected(String),
#[error("Invalid job name: {name:?} in {namespace:?} at {at:?}.")]
InvalidJobName {
namespace: Vec<Box<str>>,
name: Box<str>,
at: Box<str>,
},
#[error("Invalid job reference: {reference:?} by {origin:?} in {namespace:?} at {at:?}.")]
InvalidJobReference {
namespace: Vec<Box<str>>,
origin: Box<str>,
reference: Box<str>,
at: Box<str>,
},
#[error("Project already has interpretter: {name:?} in {namespace:?} at {at}")]
DuplicateInterpretter {
namespace: Vec<Box<str>>,
name: Box<str>,
at: Box<str>,
},
#[error("Could not create commander: {name:?} in {namespace:?}: {source}")]
CommanderError {
namespace: Vec<Box<str>>,
name: Option<Box<str>>,
source: BoxError,
},
#[error("Could not create interpretter {interpretter:?} for provider: {provider:?}: {source}")]
InterpretterError {
provider: Box<str>,
interpretter: Box<str>,
source: serde_content::Error,
},
#[error("Already have a project: {old_name:?} in {namespace:?}, now {name:?} at {at}")]
DuplicateProject {
namespace: Vec<Box<str>>,
name: Box<str>,
at: Box<str>,
old_name: Box<str>,
},
}
#[derive(Default)]
pub struct Builder {
projects: BTreeMap<Vec<Box<str>>, Project>,
#[allow(clippy::type_complexity)]
providers: BTreeMap<
Box<str>,
Box<dyn Fn(&str, &Value) -> Result<Box<dyn 'static + Commander + Send + Sync>, BoxError>>,
>,
}
#[derive(Debug, Default)]
struct Project {
name: Option<Box<str>>,
base: Option<Url>,
jobs: BTreeMap<Box<str>, Arc<Job>>,
links: BTreeSet<Link>,
interpretters: BTreeMap<Box<str>, Interpretter>,
}
impl Builder {
pub fn configure_provider<
'de,
I: Deserialize<'de>,
C: 'static + Commander + Sync + Send,
F: 'static + Fn(&str, I) -> Result<C, BoxError>,
>(
&mut self,
provider: impl ToString,
factory: F,
) {
let provider = provider.to_string().into_boxed_str();
info!("Adding provider {provider:?}");
self.providers.insert(
provider.clone(),
Box::new(move |i, spec| {
let provider = provider.clone();
let interpretter = i.into();
let spec = spec.clone().into_owned();
let config =
I::deserialize(serde_content::Deserializer::new(spec)).map_err(|source| {
Error::InterpretterError {
provider,
interpretter,
source,
}
})?;
factory(i, config).map(|c| Box::new(c) as Box<dyn Commander + Send + Sync>)
}),
);
}
pub fn build(&self) -> Result<Graph, Error> {
let mut indices = BTreeMap::default();
let mut graph = Graph::default();
for (namespace, project) in &self.projects {
for (name, job) in &project.jobs {
assert!(
matches!(&job.id, JobId{namespace:ns,name:n} if namespace == ns && name==n)
);
let interpretter =
self.build_commander(&job.id.namespace, job.interpretter.as_deref())?;
Self::build_job(&mut graph, &mut indices, job.clone(), interpretter)?;
}
for link in &project.links {
Self::build_link(&mut graph, &mut indices, namespace.clone(), link)?;
}
}
Ok(graph)
}
fn build_commander(
&self,
namespace: &[Box<str>],
name: Option<&str>,
) -> Result<Box<dyn Commander + Send + Sync>, Error> {
let find = (|| {
let name = name.unwrap_or_default();
trace!(
"want interpretter in {namespace:?} from {:?}",
self.projects.keys()
);
let project = self.projects.get(namespace)?;
trace!(
"want interpretter {name:?} from {:?}",
project.interpretters.keys()
);
let interpretter = project.interpretters.get(name)?;
trace!("got interpretter {interpretter:?}");
trace!(
"want provider {:?} from {:?}",
interpretter.provider,
self.providers.keys()
);
let provider = self.providers.get(&interpretter.provider)?;
trace!("calling provider");
Some(provider(name, &interpretter.spec))
})();
let Some(result) = find else {
return Ok(Box::new(()));
};
result.map_err(|source| Error::CommanderError {
namespace: namespace.to_vec(),
name: name.map(Into::into),
source,
})
}
fn build_job(
graph: &mut Graph,
indices: &mut BTreeMap<JobId, usize>,
job: Arc<Job>,
commander: Box<dyn Commander + Send + Sync>,
) -> Result<(), Error> {
log::debug!("Job {} runs interpretter {commander:?}", job.id);
let top = graph.dag.node_count();
let node = *indices.entry(job.id.clone()).or_insert(top);
if top == node {
graph
.dag
.add_node(State::new(job).interpret_with(commander.into()));
} else {
let previous = graph
.dag
.node_weight_mut(NodeIndex::new(node))
.expect("node must exist as we have an index of it");
if previous.job.source == "".into() {
previous.job = job;
} else {
return Err(Error::DuplicateJob {
id: job.id.clone(),
at: job.source.clone(),
old: previous.job.source.clone(),
});
}
}
Ok(())
}
fn build_job_id(
name: Box<str>,
namespace: Vec<Box<str>>,
source: &str,
) -> Result<JobId, Error> {
if name.contains("@") {
return Err(Error::InvalidJobName {
namespace,
name,
at: source.into(),
});
}
let id = JobId { namespace, name };
Ok(id)
}
fn build_link(
graph: &mut Graph,
indices: &mut BTreeMap<JobId, usize>,
namespace: Vec<Box<str>>,
link: &Link,
) -> Result<(), Error> {
match link {
Link::Edge {
source,
origin,
target,
direction,
codes,
} => {
let (other, target) = Self::build_reference(
graph,
indices,
namespace.clone(),
origin,
source,
target.clone(),
)?;
let (node, origin) = Self::build_reference(
graph,
indices,
namespace,
&origin.clone(),
source,
origin.clone(),
)?;
let edge = Edge {
source: source.clone(),
origin,
reference: target,
relation: match direction {
Direction::Outgoing => super::Relation::Trigger,
Direction::Incoming => super::Relation::Dependency,
},
codes: codes.to_vec(),
};
graph
.dag
.add_edge(
NodeIndex::new([node, other][direction.index()]),
NodeIndex::new([other, node][direction.index()]),
edge,
)
.map_err(|WouldCycle(edge)| Error::CycleDetected(edge.to_string()))?;
Ok(())
}
Link::Gate {
source,
name,
origin,
} => {
let (_n, _i) =
Self::build_reference(graph, indices, namespace, origin, source, name.clone())?;
Ok(())
}
}
}
fn build_reference(
graph: &mut Graph,
indices: &mut BTreeMap<JobId, usize>,
namespace: Vec<Box<str>>,
origin: &str,
source: &str,
reference: Box<str>,
) -> Result<(usize, JobId), Error> {
let mut parts = reference.split("@").map(Into::into);
let id = match (parts.next(), parts.next(), parts.next()) {
(None, _, _) => unreachable!(),
(Some(name), None, _) => Ok(JobId { name, namespace }),
(Some(subproject), Some(name), None) => Ok(JobId {
name,
namespace: namespace.iter().cloned().chain(once(subproject)).collect(),
}),
(Some(_), Some(_), Some(_)) => {
Err(Error::InvalidJobReference {
namespace: namespace.clone(),
origin: origin.into(),
reference,
at: source.into(),
})
}
}?;
let top: usize = graph.dag.node_count();
let node = *indices.entry(id.clone()).or_insert(top);
if top == node {
graph.dag.add_node(State::new(Arc::new(Job {
id: id.clone(),
source: format!(
"urn:job:{}",
JobId {
namespace: id.namespace.clone(),
name: origin.into()
}
)
.into_boxed_str(),
..Default::default()
})));
} else {
}
Ok((node, id))
}
}
impl Builder {
fn get_project_mut(&mut self, namespace: Vec<Box<str>>) -> &mut Project {
self.projects.entry(namespace).or_default()
}
}
impl Configuration for Builder {
type Error = Error;
fn configure_project(
&mut self,
namespace: &[Box<str>],
name: Box<str>,
base: Url,
) -> Result<(), Error> {
let project = self.get_project_mut(namespace.to_vec());
if let Some(old_name) = &project.name {
return Err(Error::DuplicateProject {
namespace: namespace.to_vec(),
name,
at: base.as_str().into(),
old_name: old_name.clone(),
});
}
project.name = Some(name);
project.base = Some(base);
Ok(())
}
fn configure_interpretter(
&mut self,
namespace: &[Box<str>],
name: Box<str>,
source: Box<str>,
provider: Box<str>,
spec: Value<'static>,
) -> Result<(), Error> {
let project = self.get_project_mut(namespace.to_vec());
if project.interpretters.contains_key(&name) {
return Err(Error::DuplicateInterpretter {
namespace: namespace.to_vec(),
name,
at: source,
});
}
let replaced = project.interpretters.insert(
name.clone(),
Interpretter {
provider,
spec,
},
);
assert!(replaced.is_none());
Ok(())
}
fn configure_job(
&mut self,
namespace: &[Box<str>],
name: Box<str>,
source: Box<str>,
script: Vec<Box<str>>,
interpretter: Option<Box<str>>,
relations: Vec<Relation>,
) -> Result<(), Error> {
let project = self.get_project_mut(namespace.to_vec());
let id = Self::build_job_id(name, namespace.to_vec(), &source)?;
if let Some(previous) = project.jobs.get(&id.name) {
Err(Error::DuplicateJob {
id,
at: source,
old: previous.source.clone(),
})
} else {
project
.links
.extend(links(relations, &id.namespace, &id.name));
project.jobs.insert(
id.name.clone(),
Arc::new(Job {
id,
source,
script,
interpretter,
}),
);
Ok(())
}
}
}
#[derive(Debug)]
struct Interpretter {
pub provider: Box<str>,
pub spec: Value<'static>,
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
enum Link {
Edge {
source: Box<str>,
origin: Box<str>,
target: Box<str>,
direction: Direction,
codes: Vec<Code>,
},
Gate {
source: Box<str>,
origin: Box<str>,
name: Box<str>,
},
}
fn namefmt(root: &str, r: &str) -> String {
format!("{root}.{}", r)
}
fn links(
relations: impl IntoIterator<Item = Relation>,
namespace: &[Box<str>],
root: &str,
) -> Vec<Link> {
let mut links = vec![];
let source = format!(
"urn:job:{}",
JobId {
namespace: namespace.to_vec(),
name: root.into()
}
)
.into_boxed_str();
for relation in relations {
match relation {
Relation::Trigger { target, codes } => links.push(Link::Edge {
source: source.clone(),
origin: root.into(),
target,
direction: Direction::Outgoing,
codes,
}),
Relation::When(rule) => links.extend(rule_links(rule, source.clone(), root, true)),
}
}
links
}
fn rule_links(rule: Rule, source: Box<str>, root: &str, gated: bool) -> Vec<Link> {
let mut links = vec![];
match rule {
Rule::All {
rules,
name,
} => {
let rname = namefmt(root, &name);
for rule in rules {
for link in rule_links(rule, source.clone(), &rname, true) {
links.push(link);
}
}
if gated {
links.push(Link::Edge {
source: source.clone(),
origin: root.into(),
target: rname.clone().into(),
direction: Direction::Incoming,
codes: vec![],
});
} else {
links.push(Link::Edge {
source: source.clone(),
target: root.into(),
origin: rname.clone().into(),
direction: Direction::Outgoing,
codes: vec![],
});
}
links.push(Link::Gate {
source,
origin: root.into(),
name: rname.into_boxed_str(),
});
}
Rule::Any {
rules,
name,
} => {
let rname = namefmt(root, &name);
for rule in rules {
for link in rule_links(rule, source.clone(), &rname, false) {
links.push(link);
}
}
if gated {
links.push(Link::Edge {
source: source.clone(),
origin: root.into(),
target: rname.clone().into(),
direction: Direction::Incoming,
codes: vec![],
});
} else {
links.push(Link::Edge {
source: source.clone(),
target: root.into(),
origin: rname.clone().into(),
direction: Direction::Outgoing,
codes: vec![],
});
}
links.push(Link::Gate {
source,
origin: root.into(),
name: rname.into_boxed_str(),
});
}
Rule::After {
target,
codes,
} => {
if gated {
links.push(Link::Edge {
source,
origin: root.into(),
target: target.clone(),
direction: Direction::Incoming,
codes: codes.clone(),
});
} else {
links.push(Link::Edge {
source,
target: root.into(),
origin: target.clone(),
direction: Direction::Outgoing,
codes: codes.clone(),
});
}
}
}
links
}