#![warn(missing_docs)]
#![forbid(unsafe_code)]
pub mod error;
mod pipeline;
pub use crate::pipeline::api::{
dag::cmd_dag, delete::cmd_delete, export::cmd_export, import::cmd_import, list::cmd_list,
new::cmd_new, run::cmd_run, step_dependency::cmd_step_dependency, step_new::cmd_step_new,
step_output::cmd_step_output, step_show::cmd_step_show, step_update::cmd_step_update,
update::cmd_update,
};
use clap::Parser;
use clap_complete::ArgValueCompleter;
use pipeline::api::dag::DagCLI;
use pipeline::api::delete::DeleteCLI;
use pipeline::api::export::ExportCLI;
use pipeline::api::import::ImportCLI;
use pipeline::api::new::NewCLI;
use pipeline::api::update::UpdateCLI;
pub use pipeline::deps;
use pipeline::step::handle_step_cli;
use pipeline::step::StepCLI;
use pipeline::util::pipeline_name_completer;
use serde::{Deserialize, Serialize};
use std::io::BufRead;
use std::str::FromStr;
use xvc_core::XvcConfigResult;
use xvc_core::XvcOutputSender;
use xvc_core::XvcStore;
use xvc_core::{conf, FromConfigKey, UpdateFromXvcConfig, XvcConfig};
use xvc_core::XvcPath;
use xvc_core::XvcRoot;
use xvc_core::{self, persist, XvcEntity};
use crate::error::{Error, Result};
pub use crate::pipeline::command::CommandProcess;
pub use crate::pipeline::command::XvcStepCommand;
pub use crate::pipeline::deps::{param::XvcParamFormat, XvcDependency};
pub use crate::pipeline::outs::XvcMetricsFormat;
pub use crate::pipeline::outs::XvcOutput;
pub use crate::pipeline::schema::XvcPipelineSchema;
pub use crate::pipeline::schema::XvcStepSchema;
pub use crate::pipeline::step::XvcStep;
use crate::pipeline::XvcStepInvalidate;
pub use crate::pipeline::api::run::RunCLI;
#[derive(Debug, Parser, Clone)]
#[command(name = "pipeline")]
pub struct PipelineCLI {
#[arg(long, short, global = true, add = ArgValueCompleter::new(pipeline_name_completer))]
pub pipeline_name: Option<String>,
#[command(subcommand)]
pub subcommand: PipelineSubCommand,
}
#[derive(Debug, Clone, Parser)]
#[command()]
#[allow(clippy::large_enum_variant)]
pub enum PipelineSubCommand {
#[command(visible_aliases=&["n"])]
New(NewCLI),
#[command(visible_aliases=&["u"])]
Update(UpdateCLI),
#[command(visible_aliases=&["D"])]
Delete(DeleteCLI),
#[command(visible_aliases=&["r"])]
Run(RunCLI),
#[command(visible_aliases=&["l"])]
List,
#[command(visible_aliases=&["d"])]
Dag(DagCLI),
#[command(visible_aliases=&["e"])]
Export(ExportCLI),
#[command(visible_aliases=&["i"])]
Import(ImportCLI),
#[command(visible_aliases=&["s"])]
Step(StepCLI),
}
impl UpdateFromXvcConfig for PipelineCLI {
fn update_from_conf(self, conf: &XvcConfig) -> XvcConfigResult<Box<Self>> {
let default_pipeline = XvcPipeline::from_conf(conf);
let name = Some(self.pipeline_name.clone().unwrap_or(default_pipeline.name));
Ok(Box::new(Self {
pipeline_name: name,
subcommand: self.subcommand.clone(),
}))
}
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, PartialOrd, Ord)]
pub struct XvcPipeline {
pub name: String,
}
impl FromStr for XvcPipeline {
type Err = Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
Ok(Self {
name: s.to_string(),
})
}
}
persist!(XvcPipeline, "xvc-pipeline");
conf!(XvcPipeline, "pipeline.default");
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct XvcPipelineRunDir {
pub run_dir: XvcPath,
}
persist!(XvcPipelineRunDir, "xvc-pipeline-run-dir");
impl XvcPipeline {
pub fn from_name(xvc_root: &XvcRoot, name: &str) -> Result<(XvcEntity, Self)> {
let all = xvc_root.load_store::<XvcPipeline>()?;
match all.iter().find(|(_, p)| p.name == name) {
None => Err(Error::NoPipelinesFound {
name: name.to_string(),
}),
Some((e, pipeline)) => Ok((*e, pipeline.to_owned())),
}
}
}
pub fn init(xvc_root: &XvcRoot) -> Result<()> {
let conf = xvc_root.config();
let mut pipeline_store = XvcStore::<XvcPipeline>::new();
let initial_name = if let Ok(config_opt) = conf.get_str("pipeline.default") {
config_opt.option
} else {
"default".to_string()
};
pipeline_store.insert(xvc_root.new_entity(), XvcPipeline { name: initial_name });
xvc_root.save_store(&pipeline_store)?;
xvc_root.save_store(&XvcStore::<XvcPipelineRunDir>::new())?;
xvc_root.save_store(&XvcStore::<XvcStep>::new())?;
xvc_root.save_store(&XvcStore::<XvcStepCommand>::new())?;
xvc_root.save_store(&XvcStore::<XvcDependency>::new())?;
xvc_root.save_store(&XvcStore::<XvcOutput>::new())?;
xvc_root.save_store(&XvcStore::<XvcStepInvalidate>::new())?;
Ok(())
}
pub fn cmd_pipeline<R: BufRead>(
input: R,
output_snd: &XvcOutputSender,
xvc_root: &XvcRoot,
command: PipelineCLI,
) -> Result<()> {
let conf = xvc_root.config();
let command = command.update_from_conf(conf)?;
let pipeline_name = command.pipeline_name.unwrap();
match command.subcommand {
PipelineSubCommand::Run(opts) => cmd_run(output_snd, xvc_root, &pipeline_name, opts),
PipelineSubCommand::New(opts) => cmd_new(xvc_root, &pipeline_name, opts),
PipelineSubCommand::Update(opts) => cmd_update(xvc_root, &pipeline_name, opts),
PipelineSubCommand::List => cmd_list(output_snd, xvc_root),
PipelineSubCommand::Delete(opts) => cmd_delete(xvc_root, &pipeline_name, opts),
PipelineSubCommand::Export(opts) => cmd_export(output_snd, xvc_root, &pipeline_name, opts),
PipelineSubCommand::Dag(opts) => cmd_dag(output_snd, xvc_root, &pipeline_name, opts),
PipelineSubCommand::Import(opts) => cmd_import(input, xvc_root, &pipeline_name, opts),
PipelineSubCommand::Step(step_cli) => {
handle_step_cli(output_snd, xvc_root, &pipeline_name, step_cli)
}
}
}