#![cfg(test)]
#![allow(dead_code)]
use std::collections::{BTreeMap, BTreeSet};
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Mutex;
use haz_cache::Cache;
use haz_dag::edge::{Edge, EdgeKind};
use haz_dag::graph::TaskGraph;
use haz_domain::action::TaskAction;
use haz_domain::env::{EnvSettings, EnvVarName};
use haz_domain::name::{ProjectName, TagName, TaskName};
use haz_domain::path::{ProjectRoot, WorkspaceRootPath};
use haz_domain::project::Project;
use haz_domain::settings::WorkspaceSettings;
use haz_domain::settings::cache::HashAlgo;
use haz_domain::settings::concurrency::{ConcurrencyDefault, ConcurrencySettings};
use haz_domain::settings::execution::{CancelGrace, ExecutionSettings};
use haz_domain::task::Task;
use haz_domain::task_id::TaskId;
use haz_domain::workspace::Workspace;
use haz_vfs::MemFilesystem;
use nonempty::NonEmpty;
use tokio_util::sync::CancellationToken;
use crate::mock_impl::{MockProcessSpawner, MockSpec};
use crate::run_task::{
CancelledRecord, CompletedRecord, RunContext, RunObserver, RunOutcome, RunSource, RunState,
SkipCause, SkipRecord,
};
pub(super) const WORKSPACE_HOST: &str = "/ws";
pub(super) fn project_name(s: &str) -> ProjectName {
ProjectName::from_str(s).unwrap()
}
pub(super) fn task_name_(s: &str) -> TaskName {
TaskName::from_str(s).unwrap()
}
pub(super) fn tag(s: &str) -> TagName {
TagName::from_str(s).unwrap()
}
pub(super) fn tid(project: &str, task: &str) -> TaskId {
TaskId {
project: project_name(project),
task: task_name_(task),
}
}
pub(super) fn command_true() -> TaskAction {
TaskAction::Command(NonEmpty::from_vec(vec!["true".to_owned()]).unwrap())
}
pub(super) fn make_task(name: &str) -> Task {
Task {
name: task_name_(name),
action: command_true(),
inputs: Vec::new(),
outputs: Vec::new(),
deps: Vec::new(),
weak_deps: Vec::new(),
mutex: None,
env: EnvSettings::default(),
}
}
pub(super) fn make_task_with(
name: &str,
cmd: &[&str],
mutex: Option<haz_domain::mutex::Mutex>,
) -> Task {
let argv: Vec<String> = cmd.iter().map(|s| (*s).to_owned()).collect();
Task {
name: task_name_(name),
action: TaskAction::Command(NonEmpty::from_vec(argv).unwrap()),
inputs: Vec::new(),
outputs: Vec::new(),
deps: Vec::new(),
weak_deps: Vec::new(),
mutex,
env: EnvSettings::default(),
}
}
pub(super) fn make_task_with_outputs(
name: &str,
outputs: Vec<haz_domain::path::OutputSpec>,
) -> Task {
Task {
name: task_name_(name),
action: command_true(),
inputs: Vec::new(),
outputs,
deps: Vec::new(),
weak_deps: Vec::new(),
mutex: None,
env: EnvSettings::default(),
}
}
pub(super) fn make_task_with_inputs_outputs(
name: &str,
inputs: Vec<&str>,
outputs: Vec<&str>,
deps: Vec<&str>,
) -> Task {
Task {
name: task_name_(name),
action: command_true(),
inputs: inputs
.into_iter()
.map(|s| haz_domain::path::InputSpec::parse(s).unwrap())
.collect(),
outputs: outputs
.into_iter()
.map(|s| haz_domain::path::OutputSpec::parse(s).unwrap())
.collect(),
deps: deps
.into_iter()
.map(|s| haz_domain::task_ref::TaskRef::parse(s).unwrap())
.collect(),
weak_deps: Vec::new(),
mutex: None,
env: EnvSettings::default(),
}
}
pub(super) fn workspace_absolute_canonical(path: &str) -> haz_domain::path::CanonicalPath {
haz_domain::path::CanonicalPath::parse_workspace_absolute(path).unwrap()
}
pub(super) fn make_project(name: &str, tags: BTreeSet<TagName>, tasks: Vec<Task>) -> Project {
let tasks: BTreeMap<TaskName, Task> = tasks.into_iter().map(|t| (t.name.clone(), t)).collect();
Project {
name: project_name(name),
root: ProjectRoot::WorkspaceRoot,
tags,
tasks,
}
}
pub(super) fn make_workspace(projects: Vec<Project>, settings: WorkspaceSettings) -> Workspace {
let projects: BTreeMap<ProjectName, Project> =
projects.into_iter().map(|p| (p.name.clone(), p)).collect();
Workspace {
root: WorkspaceRootPath::try_new(PathBuf::from(WORKSPACE_HOST)).unwrap(),
projects,
overlays: BTreeMap::new(),
settings,
}
}
pub(super) fn make_graph(nodes: Vec<TaskId>, edges: Vec<Edge>) -> TaskGraph {
TaskGraph {
nodes: nodes.into_iter().collect(),
edges: edges.into_iter().collect(),
}
}
pub(super) fn h_edge(from: TaskId, to: TaskId) -> Edge {
Edge {
from,
to,
kind: EdgeKind::Hard,
}
}
pub(super) fn s_edge(from: TaskId, to: TaskId) -> Edge {
Edge {
from,
to,
kind: EdgeKind::Soft,
}
}
pub(super) fn fixed_cap(n: usize) -> ConcurrencyDefault {
ConcurrencyDefault::Fixed(NonZeroUsize::new(n).unwrap())
}
pub(super) fn workspace_settings_with(default: ConcurrencyDefault) -> WorkspaceSettings {
WorkspaceSettings {
concurrency: ConcurrencySettings {
default,
tags: BTreeMap::new(),
},
..WorkspaceSettings::default()
}
}
pub(super) fn workspace_settings_with_grace(
default: ConcurrencyDefault,
grace_secs: f64,
) -> WorkspaceSettings {
WorkspaceSettings {
concurrency: ConcurrencySettings {
default,
tags: BTreeMap::new(),
},
execution: ExecutionSettings {
cancel_grace: CancelGrace::try_from_secs_f64(grace_secs).unwrap(),
},
..WorkspaceSettings::default()
}
}
pub(super) fn workspace_settings_with_tag_cap(
default: ConcurrencyDefault,
tag_name: &str,
cap: usize,
) -> WorkspaceSettings {
let mut tags = BTreeMap::new();
tags.insert(tag(tag_name), NonZeroUsize::new(cap).unwrap());
WorkspaceSettings {
concurrency: ConcurrencySettings { default, tags },
..WorkspaceSettings::default()
}
}
pub(super) struct Fixture {
pub cache: Cache<MemFilesystem>,
pub workspace: Workspace,
pub graph: TaskGraph,
pub host_env: BTreeMap<EnvVarName, String>,
pub cancel: CancellationToken,
}
impl Fixture {
pub(super) fn new(workspace: Workspace, graph: TaskGraph) -> Self {
let mut fs = MemFilesystem::new();
fs.add_dir(WORKSPACE_HOST).unwrap();
let cache = Cache::new(fs, Path::new(WORKSPACE_HOST), HashAlgo::Blake3);
Self {
cache,
workspace,
graph,
host_env: BTreeMap::new(),
cancel: CancellationToken::new(),
}
}
}
pub(super) fn make_ctx<'a, O>(
fixture: &'a Fixture,
spawner: &'a MockProcessSpawner,
observer: &'a O,
) -> RunContext<'a, MemFilesystem, MockProcessSpawner, O>
where
O: RunObserver,
{
make_ctx_with_cancel(fixture, spawner, observer, &fixture.cancel)
}
pub(super) fn make_ctx_with_cancel<'a, O>(
fixture: &'a Fixture,
spawner: &'a MockProcessSpawner,
observer: &'a O,
cancel: &'a CancellationToken,
) -> RunContext<'a, MemFilesystem, MockProcessSpawner, O>
where
O: RunObserver,
{
RunContext {
fs: fixture.cache.fs(),
cache: &fixture.cache,
spawner,
observer,
workspace: &fixture.workspace,
graph: &fixture.graph,
host_env: &fixture.host_env,
algo: HashAlgo::Blake3,
cancel,
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) enum Event {
Started(TaskId),
Finished(TaskId, RunState, RunSource),
Skipped(TaskId, SkipCause),
Cancelled(TaskId, CancelledRecord),
}
#[derive(Default)]
pub(super) struct Recorder {
events: Mutex<Vec<Event>>,
}
impl Recorder {
pub(super) fn events(&self) -> Vec<Event> {
self.events.lock().unwrap().clone()
}
pub(super) fn started_order(&self) -> Vec<TaskId> {
self.events()
.into_iter()
.filter_map(|e| match e {
Event::Started(t) => Some(t),
Event::Finished(..) | Event::Skipped(..) | Event::Cancelled(..) => None,
})
.collect()
}
}
impl RunObserver for Recorder {
fn on_task_started(&self, task: &TaskId) {
self.events
.lock()
.unwrap()
.push(Event::Started(task.clone()));
}
fn on_stdout(&self, _task: &TaskId, _bytes: &[u8]) {}
fn on_stderr(&self, _task: &TaskId, _bytes: &[u8]) {}
fn on_task_finished(&self, task: &TaskId, record: &CompletedRecord) {
self.events.lock().unwrap().push(Event::Finished(
task.clone(),
record.state,
record.source,
));
}
fn on_task_skipped(&self, task: &TaskId, record: &SkipRecord) {
self.events
.lock()
.unwrap()
.push(Event::Skipped(task.clone(), record.cause.clone()));
}
fn on_task_cancelled(&self, task: &TaskId, record: &CancelledRecord) {
self.events
.lock()
.unwrap()
.push(Event::Cancelled(task.clone(), record.clone()));
}
}
pub(super) fn completed_for<'a>(
outcomes: &'a BTreeMap<TaskId, RunOutcome>,
task: &TaskId,
) -> &'a CompletedRecord {
match outcomes.get(task) {
Some(RunOutcome::Completed(record)) => record,
Some(RunOutcome::Skipped(skip)) => {
panic!("expected Completed for {task:?}, got Skipped({skip:?})")
}
Some(RunOutcome::Cancelled(cancel)) => {
panic!("expected Completed for {task:?}, got Cancelled({cancel:?})")
}
None => panic!("no outcome for {task:?}"),
}
}
pub(super) fn skipped_for<'a>(
outcomes: &'a BTreeMap<TaskId, RunOutcome>,
task: &TaskId,
) -> &'a SkipRecord {
match outcomes.get(task) {
Some(RunOutcome::Skipped(record)) => record,
Some(RunOutcome::Completed(rec)) => {
panic!("expected Skipped for {task:?}, got Completed({rec:?})")
}
Some(RunOutcome::Cancelled(cancel)) => {
panic!("expected Skipped for {task:?}, got Cancelled({cancel:?})")
}
None => panic!("no outcome for {task:?}"),
}
}
pub(super) fn cancelled_for<'a>(
outcomes: &'a BTreeMap<TaskId, RunOutcome>,
task: &TaskId,
) -> &'a CancelledRecord {
match outcomes.get(task) {
Some(RunOutcome::Cancelled(record)) => record,
Some(RunOutcome::Completed(rec)) => {
panic!("expected Cancelled for {task:?}, got Completed({rec:?})")
}
Some(RunOutcome::Skipped(skip)) => {
panic!("expected Cancelled for {task:?}, got Skipped({skip:?})")
}
None => panic!("no outcome for {task:?}"),
}
}
pub(super) fn push_n_default_specs(spawner: &MockProcessSpawner, n: usize) {
for _ in 0..n {
spawner.push_spec(MockSpec::default());
}
}
pub(super) fn push_spec_with_exit(spawner: &MockProcessSpawner, exit_code: i32) {
spawner.push_spec(MockSpec {
exit_code,
..MockSpec::default()
});
}
pub(super) fn exit_on_terminate_spec(graceful_exit_code: i32) -> MockSpec {
MockSpec {
behaviour: crate::mock_impl::MockBehaviour::OnTerminate { graceful_exit_code },
exit_code: 137,
..MockSpec::default()
}
}
pub(super) fn pos_started(events: &[Event], task: &TaskId) -> usize {
events
.iter()
.position(|e| matches!(e, Event::Started(t) if t == task))
.unwrap_or_else(|| panic!("expected Started({task:?}) in {events:?}"))
}
pub(super) fn pos_finished(events: &[Event], task: &TaskId) -> usize {
events
.iter()
.position(|e| matches!(e, Event::Finished(t, _, _) if t == task))
.unwrap_or_else(|| panic!("expected Finished({task:?}) in {events:?}"))
}
pub(super) fn count_started(events: &[Event], task: &TaskId) -> usize {
events
.iter()
.filter(|e| matches!(e, Event::Started(t) if t == task))
.count()
}