use crate::client::workflow_graph::WorkflowGraph;
use crate::client::workflow_spec::{WorkflowActionSpec, WorkflowSpec};
use crate::models::{JobModel, WorkflowActionModel, WorkflowModel};
use regex::Regex;
use serde::Serialize;
use std::collections::{HashMap, HashSet};
#[derive(Debug, Clone, Serialize)]
pub struct SchedulerAllocation {
pub scheduler: String,
pub scheduler_type: String,
pub num_allocations: i64,
pub jobs: Vec<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type", content = "data")]
pub enum EventTrigger {
WorkflowStart,
JobsComplete { jobs: Vec<String> },
}
#[derive(Debug, Clone, Serialize)]
pub struct ExecutionEvent {
pub id: String,
pub trigger: EventTrigger,
pub trigger_description: String,
pub scheduler_allocations: Vec<SchedulerAllocation>,
pub jobs_becoming_ready: Vec<String>,
pub depends_on_events: Vec<String>,
pub unlocks_events: Vec<String>,
}
#[derive(Debug, Serialize)]
pub struct ExecutionPlan {
pub events: HashMap<String, ExecutionEvent>,
pub root_events: Vec<String>,
pub leaf_events: Vec<String>,
#[serde(skip)]
pub graph: Option<WorkflowGraph>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ExecutionStage {
pub stage_number: usize,
pub trigger_description: String,
pub scheduler_allocations: Vec<SchedulerAllocation>,
pub jobs_becoming_ready: Vec<String>,
}
impl ExecutionPlan {
pub fn from_spec(spec: &WorkflowSpec) -> Result<Self, Box<dyn std::error::Error>> {
let graph = WorkflowGraph::from_spec(spec)?;
let mut events: HashMap<String, ExecutionEvent> = HashMap::new();
let root_jobs: Vec<String> = graph.roots().iter().map(|s| s.to_string()).collect();
let mut start_allocations = Vec::new();
if let Some(ref actions) = spec.actions {
for action in actions {
if action.trigger_type == "on_workflow_start"
&& let Some(alloc) = build_scheduler_allocation(spec, action)?
{
start_allocations.push(alloc);
}
}
}
let start_event = ExecutionEvent {
id: "start".to_string(),
trigger: EventTrigger::WorkflowStart,
trigger_description: "Workflow Start".to_string(),
scheduler_allocations: start_allocations,
jobs_becoming_ready: root_jobs.clone(),
depends_on_events: vec![],
unlocks_events: vec![], };
events.insert("start".to_string(), start_event);
let mut job_completion_event: HashMap<String, String> = HashMap::new();
for job in &root_jobs {
job_completion_event.insert(job.clone(), "start".to_string());
}
let mut processed_jobs: HashSet<String> = root_jobs.iter().cloned().collect();
loop {
let newly_ready = graph.jobs_unblocked_by(&processed_jobs);
if newly_ready.is_empty() {
break;
}
let mut jobs_by_deps: HashMap<Vec<String>, Vec<String>> = HashMap::new();
for job in &newly_ready {
if let Some(deps) = graph.dependencies_of(job) {
let mut dep_list: Vec<String> = deps.iter().cloned().collect();
dep_list.sort();
jobs_by_deps.entry(dep_list).or_default().push(job.clone());
}
}
for (deps, jobs_becoming_ready) in jobs_by_deps {
let event_id = generate_event_id(&deps);
let mut depends_on_event_ids: HashSet<String> = HashSet::new();
for dep in &deps {
if let Some(evt_id) = job_completion_event.get(dep) {
depends_on_event_ids.insert(evt_id.clone());
}
}
let depends_on_events: Vec<String> = depends_on_event_ids.into_iter().collect();
let mut scheduler_allocations = Vec::new();
if let Some(ref actions) = spec.actions {
let matching = graph.matching_actions(&jobs_becoming_ready, actions);
for action in matching {
if let Some(alloc) = build_scheduler_allocation(spec, action)? {
scheduler_allocations.push(alloc);
}
}
}
let trigger_description = build_trigger_description(&deps);
let event = ExecutionEvent {
id: event_id.clone(),
trigger: EventTrigger::JobsComplete { jobs: deps.clone() },
trigger_description,
scheduler_allocations,
jobs_becoming_ready: jobs_becoming_ready.clone(),
depends_on_events,
unlocks_events: vec![], };
events.insert(event_id.clone(), event);
for job in &jobs_becoming_ready {
job_completion_event.insert(job.clone(), event_id.clone());
}
}
processed_jobs.extend(newly_ready);
}
let event_deps: Vec<(String, Vec<String>)> = events
.values()
.map(|e| (e.id.clone(), e.depends_on_events.clone()))
.collect();
for (event_id, deps) in event_deps {
for dep_event_id in deps {
if let Some(dep_event) = events.get_mut(&dep_event_id)
&& !dep_event.unlocks_events.contains(&event_id)
{
dep_event.unlocks_events.push(event_id.clone());
}
}
}
let root_events: Vec<String> = events
.values()
.filter(|e| e.depends_on_events.is_empty())
.map(|e| e.id.clone())
.collect();
let leaf_events: Vec<String> = events
.values()
.filter(|e| e.unlocks_events.is_empty())
.map(|e| e.id.clone())
.collect();
Ok(ExecutionPlan {
events,
root_events,
leaf_events,
graph: Some(graph),
})
}
pub fn from_database_models(
_workflow: &WorkflowModel,
jobs: &[JobModel],
actions: &[WorkflowActionModel],
slurm_schedulers: &[crate::models::SlurmSchedulerModel],
resource_requirements: &[crate::models::ResourceRequirementsModel],
) -> Result<Self, Box<dyn std::error::Error>> {
let graph = WorkflowGraph::from_jobs(jobs, resource_requirements)?;
let scheduler_id_to_name: HashMap<i64, String> = slurm_schedulers
.iter()
.filter_map(|s| match (s.id, &s.name) {
(Some(id), Some(name)) => Some((id, name.clone())),
_ => None,
})
.collect();
let mut events: HashMap<String, ExecutionEvent> = HashMap::new();
let _job_id_to_name: HashMap<i64, String> = jobs
.iter()
.filter_map(|j| j.id.map(|id| (id, j.name.clone())))
.collect();
let job_name_to_id: HashMap<String, i64> = jobs
.iter()
.filter_map(|j| j.id.map(|id| (j.name.clone(), id)))
.collect();
let root_jobs: Vec<String> = graph.roots().iter().map(|s| s.to_string()).collect();
let dependency_graph_by_name: HashMap<String, HashSet<String>> = jobs
.iter()
.map(|j| {
let deps = graph.dependencies_of(&j.name).cloned().unwrap_or_default();
(j.name.clone(), deps)
})
.collect();
let mut start_allocations = Vec::new();
for action in actions {
if action.trigger_type == "on_workflow_start"
&& let Some(alloc) =
build_scheduler_allocation_from_db_action(action, jobs, &scheduler_id_to_name)?
{
start_allocations.push(alloc);
}
}
let start_event = ExecutionEvent {
id: "start".to_string(),
trigger: EventTrigger::WorkflowStart,
trigger_description: "Workflow Start".to_string(),
scheduler_allocations: start_allocations,
jobs_becoming_ready: root_jobs.clone(),
depends_on_events: vec![],
unlocks_events: vec![],
};
events.insert("start".to_string(), start_event);
let mut job_completion_event: HashMap<String, String> = HashMap::new();
for job in &root_jobs {
job_completion_event.insert(job.clone(), "start".to_string());
}
let mut processed_jobs: HashSet<String> = root_jobs.iter().cloned().collect();
loop {
let mut newly_ready = Vec::new();
for job in jobs {
if processed_jobs.contains(&job.name) {
continue;
}
if let Some(deps) = dependency_graph_by_name.get(&job.name)
&& !deps.is_empty()
&& deps.iter().all(|d| processed_jobs.contains(d))
{
newly_ready.push(job.name.clone());
}
}
if newly_ready.is_empty() {
break;
}
let mut jobs_by_deps: HashMap<Vec<String>, Vec<String>> = HashMap::new();
for job_name in &newly_ready {
if let Some(deps) = dependency_graph_by_name.get(job_name) {
let mut dep_list: Vec<String> = deps.iter().cloned().collect();
dep_list.sort();
jobs_by_deps
.entry(dep_list)
.or_default()
.push(job_name.clone());
}
}
for (deps, jobs_becoming_ready) in jobs_by_deps {
let event_id = generate_event_id(&deps);
let mut depends_on_event_ids: HashSet<String> = HashSet::new();
for dep in &deps {
if let Some(evt_id) = job_completion_event.get(dep) {
depends_on_event_ids.insert(evt_id.clone());
}
}
let depends_on_events: Vec<String> = depends_on_event_ids.into_iter().collect();
let mut scheduler_allocations = Vec::new();
for action in actions {
if action.trigger_type == "on_jobs_ready" {
let empty_vec = vec![];
let action_job_ids = action.job_ids.as_ref().unwrap_or(&empty_vec);
let matches_any = action_job_ids.iter().any(|action_job_id| {
jobs_becoming_ready
.iter()
.any(|job_name| job_name_to_id.get(job_name) == Some(action_job_id))
});
if matches_any
&& let Some(alloc) = build_scheduler_allocation_from_db_action(
action,
jobs,
&scheduler_id_to_name,
)?
{
scheduler_allocations.push(alloc);
}
}
}
let trigger_description = build_trigger_description(&deps);
let event = ExecutionEvent {
id: event_id.clone(),
trigger: EventTrigger::JobsComplete { jobs: deps.clone() },
trigger_description,
scheduler_allocations,
jobs_becoming_ready: jobs_becoming_ready.clone(),
depends_on_events,
unlocks_events: vec![],
};
events.insert(event_id.clone(), event);
for job in &jobs_becoming_ready {
job_completion_event.insert(job.clone(), event_id.clone());
}
}
processed_jobs.extend(newly_ready);
}
let event_deps: Vec<(String, Vec<String>)> = events
.values()
.map(|e| (e.id.clone(), e.depends_on_events.clone()))
.collect();
for (event_id, deps) in event_deps {
for dep_event_id in deps {
if let Some(dep_event) = events.get_mut(&dep_event_id)
&& !dep_event.unlocks_events.contains(&event_id)
{
dep_event.unlocks_events.push(event_id.clone());
}
}
}
let root_events: Vec<String> = events
.values()
.filter(|e| e.depends_on_events.is_empty())
.map(|e| e.id.clone())
.collect();
let leaf_events: Vec<String> = events
.values()
.filter(|e| e.unlocks_events.is_empty())
.map(|e| e.id.clone())
.collect();
Ok(ExecutionPlan {
events,
root_events,
leaf_events,
graph: Some(graph),
})
}
pub fn display(&self) {
println!("\n{}", "=".repeat(80));
println!("Workflow Execution Plan (DAG)");
println!("{}", "=".repeat(80));
if let Some(ref graph) = self.graph {
let mut graph_clone = graph.clone();
let components = graph_clone.connected_components();
if components.len() > 1 {
println!(
"\nNote: Workflow has {} independent sub-workflows that can run in parallel",
components.len()
);
}
}
println!("\nEvents: {} total", self.events.len());
println!("Root events: {}", self.root_events.join(", "));
println!("Leaf events: {}", self.leaf_events.join(", "));
let mut displayed = HashSet::new();
let mut queue: Vec<String> = self.root_events.clone();
while !queue.is_empty() {
let mut next_queue = Vec::new();
let mut events_to_display = Vec::new();
for event_id in &queue {
if displayed.contains(event_id) {
continue;
}
if let Some(event) = self.events.get(event_id) {
if event
.depends_on_events
.iter()
.all(|d| displayed.contains(d))
{
events_to_display.push(event_id.clone());
} else {
next_queue.push(event_id.clone());
}
}
}
for event_id in events_to_display {
if let Some(event) = self.events.get(&event_id) {
let symbol = match &event.trigger {
EventTrigger::WorkflowStart => "â–¶",
EventTrigger::JobsComplete { .. } => "→",
};
println!("\n{} {}", symbol, event.trigger_description);
println!("{}", "-".repeat(80));
if !event.depends_on_events.is_empty() {
let dep_descriptions: Vec<String> = event
.depends_on_events
.iter()
.filter_map(|id| self.events.get(id))
.map(|e| format!("'{}'", e.trigger_description))
.collect();
println!(" After: {}", dep_descriptions.join(", "));
}
if !event.unlocks_events.is_empty() {
let unlock_descriptions: Vec<String> = event
.unlocks_events
.iter()
.filter_map(|id| self.events.get(id))
.map(|e| format!("'{}'", e.trigger_description))
.collect();
println!(" Then: {}", unlock_descriptions.join(", "));
}
if !event.scheduler_allocations.is_empty() {
println!("\n Scheduler Allocations:");
for alloc in &event.scheduler_allocations {
println!(
" • {} ({}) - {} allocation(s)",
alloc.scheduler, alloc.scheduler_type, alloc.num_allocations
);
let compact = compact_job_list(&alloc.jobs);
println!(" For jobs: {}", compact.join(", "));
}
}
if !event.jobs_becoming_ready.is_empty() {
println!("\n Jobs Becoming Ready:");
let compact = compact_job_list(&event.jobs_becoming_ready);
for item in compact {
println!(" • {}", item);
}
}
displayed.insert(event_id.clone());
for unlocked in &event.unlocks_events {
if !displayed.contains(unlocked) && !next_queue.contains(unlocked) {
next_queue.push(unlocked.clone());
}
}
}
}
queue = next_queue;
}
println!("\n{}", "=".repeat(80));
println!("Total Events: {}", self.events.len());
println!("{}\n", "=".repeat(80));
}
pub fn workflow_graph(&self) -> Option<&WorkflowGraph> {
self.graph.as_ref()
}
pub fn to_stages(&self) -> Vec<ExecutionStage> {
let mut stages = Vec::new();
let mut displayed = HashSet::new();
let mut queue: Vec<String> = self.root_events.clone();
let mut stage_number = 0;
while !queue.is_empty() {
let mut next_queue = Vec::new();
let mut events_at_level = Vec::new();
for event_id in &queue {
if displayed.contains(event_id) {
continue;
}
if let Some(event) = self.events.get(event_id) {
if event
.depends_on_events
.iter()
.all(|d| displayed.contains(d))
{
events_at_level.push(event_id.clone());
} else {
next_queue.push(event_id.clone());
}
}
}
if !events_at_level.is_empty() {
let mut merged_allocations = Vec::new();
let mut merged_jobs = Vec::new();
let mut trigger_parts = Vec::new();
for event_id in &events_at_level {
if let Some(event) = self.events.get(event_id) {
merged_allocations.extend(event.scheduler_allocations.clone());
merged_jobs.extend(event.jobs_becoming_ready.clone());
trigger_parts.push(event.trigger_description.clone());
displayed.insert(event_id.clone());
for unlocked in &event.unlocks_events {
if !displayed.contains(unlocked) && !next_queue.contains(unlocked) {
next_queue.push(unlocked.clone());
}
}
}
}
let trigger_description = if trigger_parts.len() == 1 {
trigger_parts[0].clone()
} else {
trigger_parts.join(" AND ")
};
stages.push(ExecutionStage {
stage_number,
trigger_description,
scheduler_allocations: merged_allocations,
jobs_becoming_ready: merged_jobs,
});
stage_number += 1;
}
queue = next_queue;
}
stages
}
}
fn build_trigger_description(level_jobs: &[String]) -> String {
if level_jobs.len() == 1 {
format!("When job '{}' completes", level_jobs[0])
} else if level_jobs.len() <= 3 {
format!(
"When jobs {} complete",
level_jobs
.iter()
.map(|j| format!("'{}'", j))
.collect::<Vec<_>>()
.join(", ")
)
} else {
format!(
"When {} jobs complete ({}...)",
level_jobs.len(),
level_jobs
.iter()
.take(2)
.map(|j| format!("'{}'", j))
.collect::<Vec<_>>()
.join(", ")
)
}
}
fn generate_event_id(trigger_jobs: &[String]) -> String {
if trigger_jobs.is_empty() {
return "start".to_string();
}
if trigger_jobs.len() == 1 {
return format!("after_{}", trigger_jobs[0]);
}
let first = &trigger_jobs[0];
let mut common_prefix_len = first.len();
for job in trigger_jobs.iter().skip(1) {
let matching = first
.chars()
.zip(job.chars())
.take_while(|(a, b)| a == b)
.count();
common_prefix_len = common_prefix_len.min(matching);
}
if common_prefix_len >= 3 {
let prefix = &first[..common_prefix_len];
let clean_prefix = prefix.trim_end_matches(|c: char| c == '_' || c.is_ascii_digit());
if clean_prefix.len() >= 3 {
return format!("after_{}_jobs", clean_prefix);
}
}
format!(
"after_{}_and_{}_more",
trigger_jobs[0],
trigger_jobs.len() - 1
)
}
fn get_matching_jobs(
spec: &WorkflowSpec,
action: &WorkflowActionSpec,
) -> Result<Vec<String>, Box<dyn std::error::Error>> {
let mut matched = Vec::new();
if let Some(ref names) = action.jobs {
matched.extend(names.clone());
}
if let Some(ref regexes) = action.job_name_regexes {
for regex_str in regexes {
let re = Regex::new(regex_str)?;
for job in &spec.jobs {
if re.is_match(&job.name) && !matched.contains(&job.name) {
matched.push(job.name.clone());
}
}
}
}
Ok(matched)
}
fn build_scheduler_allocation(
spec: &WorkflowSpec,
action: &WorkflowActionSpec,
) -> Result<Option<SchedulerAllocation>, Box<dyn std::error::Error>> {
if action.action_type != "schedule_nodes" {
return Ok(None);
}
let scheduler = action
.scheduler
.as_ref()
.ok_or("schedule_nodes action missing scheduler")?
.clone();
let scheduler_type = action
.scheduler_type
.as_ref()
.ok_or("schedule_nodes action missing scheduler_type")?
.clone();
let num_allocations = action.num_allocations.unwrap_or(1);
let jobs = get_matching_jobs(spec, action)?;
Ok(Some(SchedulerAllocation {
scheduler,
scheduler_type,
num_allocations,
jobs,
}))
}
fn build_scheduler_allocation_from_db_action(
action: &WorkflowActionModel,
workflow_jobs: &[JobModel],
scheduler_id_to_name: &HashMap<i64, String>,
) -> Result<Option<SchedulerAllocation>, Box<dyn std::error::Error>> {
if action.action_type != "schedule_nodes" {
return Ok(None);
}
let config = &action.action_config;
let scheduler_type = config["scheduler_type"]
.as_str()
.ok_or("Action config missing scheduler_type")?
.to_string();
let num_allocations = config["num_allocations"].as_i64().unwrap_or(1);
let empty_vec = vec![];
let action_job_ids = action.job_ids.as_ref().unwrap_or(&empty_vec);
let mut jobs = Vec::new();
for job_id in action_job_ids {
if let Some(job) = workflow_jobs.iter().find(|j| j.id == Some(*job_id)) {
jobs.push(job.name.clone());
}
}
let scheduler = if let Some(scheduler_id) = config["scheduler_id"].as_i64() {
scheduler_id_to_name
.get(&scheduler_id)
.cloned()
.unwrap_or_else(|| format!("{}_scheduler", scheduler_type))
} else {
format!("{}_scheduler", scheduler_type)
};
Ok(Some(SchedulerAllocation {
scheduler,
scheduler_type,
num_allocations,
jobs,
}))
}
fn compact_job_list(jobs: &[String]) -> Vec<String> {
if jobs.is_empty() {
return vec![];
}
if jobs.len() <= 3 {
return jobs.to_vec();
}
let pattern_re = Regex::new(r"^(.+?)(\d+)$").unwrap();
#[derive(Debug)]
struct JobGroup {
prefix: String,
numbers: Vec<(usize, String)>, }
let mut groups: HashMap<String, JobGroup> = HashMap::new();
let mut non_pattern_jobs = Vec::new();
for job in jobs {
if let Some(caps) = pattern_re.captures(job) {
let prefix = caps.get(1).unwrap().as_str().to_string();
let num_str = caps.get(2).unwrap().as_str();
let num_val = num_str.parse::<usize>().unwrap();
groups
.entry(prefix.clone())
.or_insert_with(|| JobGroup {
prefix: prefix.clone(),
numbers: Vec::new(),
})
.numbers
.push((num_val, num_str.to_string()));
} else {
non_pattern_jobs.push(job.clone());
}
}
let mut result = Vec::new();
for (_, mut group) in groups {
group.numbers.sort_by_key(|&(n, _)| n);
if group.numbers.len() <= 3 {
for (_, num_str) in &group.numbers {
result.push(format!("{}{}", group.prefix, num_str));
}
} else {
let is_sequential = group.numbers.windows(2).all(|w| w[1].0 == w[0].0 + 1);
if is_sequential {
let first_num = &group.numbers.first().unwrap().1;
let last_num = &group.numbers.last().unwrap().1;
result.push(format!("{}{{{}..{}}}", group.prefix, first_num, last_num));
} else {
let mut runs = Vec::new();
let mut current_run = vec![group.numbers[0].clone()];
for i in 1..group.numbers.len() {
if group.numbers[i].0 == current_run.last().unwrap().0 + 1 {
current_run.push(group.numbers[i].clone());
} else {
runs.push(current_run);
current_run = vec![group.numbers[i].clone()];
}
}
runs.push(current_run);
for run in runs {
if run.len() >= 4 {
let first_num = &run.first().unwrap().1;
let last_num = &run.last().unwrap().1;
result.push(format!("{}{{{}..{}}}", group.prefix, first_num, last_num));
} else {
for (_, num_str) in &run {
result.push(format!("{}{}", group.prefix, num_str));
}
}
}
}
}
}
result.extend(non_pattern_jobs);
result
}