#![feature(option_expect_none)]
extern crate petgraph;
use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
io::Write,
};
use petgraph::{graph::NodeIndex, visit::DfsPostOrder, Direction};
use thiserror::Error;
use tokio::{sync::Semaphore, task::LocalSet};
mod build_task;
pub mod disk_interface;
mod interface;
#[cfg(test)]
mod property_tests;
mod rebuilder;
pub mod task;
use interface::BuildTask;
use build_task::{CommandTaskError, CommandTaskResult};
use disk_interface::SystemDiskInterface;
pub use rebuilder::{CachingMTimeRebuilder, DiskDirtyCache, RebuilderError};
use task::{Key, Task, Tasks};
type SchedulerGraph<'a> = petgraph::Graph<&'a Key, ()>;
#[derive(Error, Debug)]
pub enum BuildError {
#[error("command pool panic")]
CommandPoolPanic,
#[error("command failed {0}")]
CommandFailed(#[from] CommandTaskError),
#[error(transparent)]
RebuilderError(#[from] anyhow::Error),
}
#[derive(Debug)]
struct Printer {
finished: usize,
total: usize,
console: console::Term,
}
impl Default for Printer {
fn default() -> Self {
Printer {
finished: 0,
total: 0,
console: console::Term::stdout(),
}
}
}
impl Printer {
fn print_status(&mut self, task: &Task) {
if !task.is_command() {
return;
}
let command = task.command().unwrap().trim();
if self.console.is_term() {
let size = self.console.size_checked().map(|(w, _h)| w).unwrap_or(80);
self.console.clear_line().expect("clear");
write!(
self.console,
"[{}/{}] {}",
self.finished,
self.total,
&command[..((size as usize) - 10)]
)
.expect("write");
} else {
writeln!(
self.console,
"[{}/{}] {}",
self.finished, self.total, command
)
.expect("write");
}
}
fn started(&mut self, task: &Task) {
self.total += 1;
self.print_status(task);
}
fn finished(&mut self, task: &Task, result: CommandTaskResult) {
self.finished += 1;
self.print_status(task);
if let Ok(output) = result {
if !output.stdout.is_empty() {
write!(
self.console,
"\n{}",
std::str::from_utf8(&output.stdout).unwrap()
)
.unwrap();
}
} else {
writeln!(self.console, "\nFAILED\n{}", task.command().unwrap()).unwrap();
match result.unwrap_err() {
err @ CommandTaskError::SpawnFailed(_) => {
writeln!(self.console, "Failed to spawn command: {}", err).unwrap();
}
CommandTaskError::CommandFailed(out) => {
self.console.write(&out.stdout).unwrap();
self.console.write(&out.stderr).unwrap();
}
}
panic!("FAILED");
}
}
}
impl Drop for Printer {
fn drop(&mut self) {
if self.console.is_term() {
self.console.write_line("").unwrap();
}
}
}
#[derive(Debug, Default)]
struct BuildState {
wanted: usize,
finished: HashSet<NodeIndex>,
ready: VecDeque<NodeIndex>,
waiting_tasks: HashSet<NodeIndex>,
}
impl BuildState {
pub fn done(&self) -> bool {
assert!(self.finished.len() <= self.wanted);
self.finished.len() == self.wanted
}
pub fn next_ready(&mut self) -> Option<NodeIndex> {
assert!(!self.done());
self.ready.pop_front()
}
pub fn add_node(&mut self, graph: &SchedulerGraph, node: NodeIndex) {
self.wanted += 1;
if graph.edges_directed(node, Direction::Outgoing).count() == 0 {
self.ready.push_back(node);
} else {
self.waiting_tasks.insert(node);
}
}
fn finish_node_success(&mut self, graph: &SchedulerGraph, node: NodeIndex) {
for dependent in graph.neighbors_directed(node, Direction::Incoming) {
if !self.waiting_tasks.contains(&dependent) {
debug_assert!(self.finished.contains(&dependent));
continue;
}
debug_assert!(!self.finished.contains(&dependent));
if graph
.neighbors_directed(dependent, Direction::Outgoing)
.all(|dependency| self.finished.contains(&dependency))
{
self.waiting_tasks.remove(&dependent);
self.ready.push_back(dependent);
}
}
}
fn finish_node_error(&mut self, graph: &SchedulerGraph, node: NodeIndex) {
for dependent in graph.neighbors_directed(node, Direction::Incoming) {
if !self.waiting_tasks.contains(&dependent) {
debug_assert!(self.finished.contains(&dependent));
continue;
}
debug_assert!(!self.finished.contains(&dependent));
self.waiting_tasks.remove(&dependent);
self.finished.insert(dependent);
self.finish_node_error(graph, dependent);
}
}
pub fn finish_node(&mut self, graph: &SchedulerGraph, node: NodeIndex, succeeded: bool) {
self.finished.insert(node);
if succeeded {
self.finish_node_success(graph, node);
} else {
self.finish_node_error(graph, node);
}
}
}
#[derive(Debug)]
pub struct ParallelTopoScheduler {
parallelism: usize,
}
impl ParallelTopoScheduler {
pub fn new(parallelism: usize) -> Self {
ParallelTopoScheduler { parallelism }
}
fn build_graph(tasks: &Tasks, start: Option<Vec<Key>>) -> SchedulerGraph {
let mut keys_to_nodes: HashMap<&Key, NodeIndex> = HashMap::new();
let mut graph = SchedulerGraph::new();
fn add_or_get_node<'a>(
map: &mut HashMap<&'a Key, NodeIndex>,
graph: &mut SchedulerGraph<'a>,
key: &'a Key,
) -> NodeIndex {
match map.entry(key) {
Entry::Vacant(e) => {
let node = graph.add_node(key);
e.insert(node);
node
}
Entry::Occupied(e) => *e.get(),
}
}
let task_map = tasks.all_tasks();
if let Some(start) = start {
let mut queue = std::collections::VecDeque::from(start);
let mut visited = HashSet::new();
while !queue.is_empty() {
let key = queue.pop_front().unwrap();
if let Some((key, task)) = task_map.get_key_value(&key) {
let source = add_or_get_node(&mut keys_to_nodes, &mut graph, key);
if !visited.contains(&source) {
visited.insert(source);
for dep in task.dependencies().iter().chain(task.order_dependencies()) {
let dep_node = add_or_get_node(&mut keys_to_nodes, &mut graph, dep);
graph.add_edge(source, dep_node, ());
queue.push_back(dep.clone());
}
}
}
}
} else {
for (key, task) in task_map {
let source = add_or_get_node(&mut keys_to_nodes, &mut graph, key);
for dep in task.dependencies().iter().chain(task.order_dependencies()) {
let dep_node = add_or_get_node(&mut keys_to_nodes, &mut graph, dep);
graph.add_edge(source, dep_node, ());
}
}
}
graph
}
fn schedule_internal(
&self,
rebuilder: &impl interface::Rebuilder<Key, CommandTaskResult>,
tasks: &Tasks,
start: Option<Vec<Key>>,
) -> Result<(), BuildError> {
let graph = Self::build_graph(&tasks, start.clone());
let mut build_state = BuildState::default();
let mut printer = Printer::default();
let mut visitor = DfsPostOrder::empty(&graph);
let requested: Box<dyn Iterator<Item = NodeIndex>> = match start {
Some(keys) => {
let x = &graph;
Box::new(
graph
.node_indices()
.filter(move |idx| keys.contains(x[*idx])),
)
}
None => Box::new(graph.externals(Direction::Incoming)),
};
for start in requested {
visitor.move_to(start);
while let Some(node) = visitor.next(&graph) {
build_state.add_node(&graph, node);
}
}
let local_set = LocalSet::new();
let mut runtime = tokio::runtime::Builder::new()
.enable_all()
.basic_scheduler()
.enable_all()
.build()
.unwrap();
let mut pending = Vec::new();
let sem = Semaphore::new(self.parallelism);
local_set.block_on(&mut runtime, async {
while !build_state.done() {
if let Some(node) = build_state.next_ready() {
let key = graph[node];
if let Some(task) = tasks.task(key) {
let rebuilder_result = rebuilder.build(key.clone(), None, task);
if let Err(e) = rebuilder_result {
return Err(From::from(anyhow::Error::new(e)));
}
let build_task = rebuilder_result.unwrap();
printer.started(task);
let sem = &sem;
pending.push(Box::pin(async move {
let _p = sem.acquire().await;
futures::future::ready((node, build_task.run().await)).await
}));
} else {
build_state.finish_node(&graph, node, true);
}
continue;
}
let (finished, _, left) = futures::future::select_all(pending).await;
pending = left;
let (node, result) = finished;
build_state.finish_node(&graph, node, result.is_ok());
let key = graph[node];
let task = tasks.task(key);
printer.finished(task.unwrap(), result);
}
assert!(pending.is_empty());
Ok(())
})
}
}
impl interface::Scheduler<Key, CommandTaskResult> for ParallelTopoScheduler {
type Error = BuildError;
fn schedule(
&self,
rebuilder: &impl interface::Rebuilder<Key, CommandTaskResult>,
tasks: &Tasks,
start: Vec<Key>,
) -> Result<(), Self::Error> {
self.schedule_internal(rebuilder, tasks, Some(start))
}
fn schedule_externals(
&self,
rebuilder: &impl interface::Rebuilder<Key, CommandTaskResult>,
tasks: &Tasks,
) -> Result<(), Self::Error> {
self.schedule_internal(rebuilder, tasks, None)
}
}
pub fn build_externals<K, V, Scheduler>(
scheduler: Scheduler,
rebuilder: &impl interface::Rebuilder<K, V>,
tasks: &Tasks,
) -> Result<(), Scheduler::Error>
where
Scheduler: interface::Scheduler<K, V>,
{
Ok(scheduler.schedule_externals(rebuilder, tasks)?)
}
pub fn build<K, V, Scheduler>(
scheduler: Scheduler,
rebuilder: &impl interface::Rebuilder<K, V>,
tasks: &Tasks,
start: Vec<K>,
) -> Result<(), Scheduler::Error>
where
Scheduler: interface::Scheduler<K, V>,
{
Ok(scheduler.schedule(rebuilder, tasks, start)?)
}
pub fn caching_mtime_rebuilder() -> CachingMTimeRebuilder<DiskDirtyCache<SystemDiskInterface>> {
CachingMTimeRebuilder::new(DiskDirtyCache::new(SystemDiskInterface{}))
}