xvc_pipeline/
lib.rs

1//! Pipeline management commands and data structures
2//!
3//! This contains CLI structs for `xvc pipeline` subcommands, [`init`] function to
4//! run during `xvc init` for pipeline related initialization, [`cmd_pipeline`]
5//! and [`handle_step_cli`] functions to dispatch the options to subcommands.
6#![warn(missing_docs)]
7#![forbid(unsafe_code)]
8pub mod error;
9mod pipeline;
10
11pub use crate::pipeline::api::{
12    dag::cmd_dag, delete::cmd_delete, export::cmd_export, import::cmd_import, list::cmd_list,
13    new::cmd_new, run::cmd_run, step_dependency::cmd_step_dependency, step_new::cmd_step_new,
14    step_output::cmd_step_output, step_show::cmd_step_show, step_update::cmd_step_update,
15    update::cmd_update,
16};
17
18use clap::Parser;
19
20use clap_complete::ArgValueCompleter;
21use pipeline::api::dag::DagCLI;
22use pipeline::api::delete::DeleteCLI;
23use pipeline::api::export::ExportCLI;
24use pipeline::api::import::ImportCLI;
25use pipeline::api::new::NewCLI;
26use pipeline::api::update::UpdateCLI;
27pub use pipeline::deps;
28
29use pipeline::step::handle_step_cli;
30use pipeline::step::StepCLI;
31use pipeline::util::pipeline_name_completer;
32use serde::{Deserialize, Serialize};
33use std::io::BufRead;
34use std::str::FromStr;
35use xvc_core::XvcConfigResult;
36use xvc_core::XvcOutputSender;
37use xvc_core::XvcStore;
38use xvc_core::{conf, FromConfigKey, UpdateFromXvcConfig, XvcConfig};
39
40use xvc_core::XvcPath;
41use xvc_core::XvcRoot;
42use xvc_core::{self, persist, XvcEntity};
43
44use crate::error::{Error, Result};
45pub use crate::pipeline::command::CommandProcess;
46pub use crate::pipeline::command::XvcStepCommand;
47pub use crate::pipeline::deps::{param::XvcParamFormat, XvcDependency};
48pub use crate::pipeline::outs::XvcMetricsFormat;
49pub use crate::pipeline::outs::XvcOutput;
50pub use crate::pipeline::schema::XvcPipelineSchema;
51pub use crate::pipeline::schema::XvcStepSchema;
52pub use crate::pipeline::step::XvcStep;
53use crate::pipeline::XvcStepInvalidate;
54
55pub use crate::pipeline::api::run::RunCLI;
56
57/// Pipeline management commands
58#[derive(Debug, Parser, Clone)]
59#[command(name = "pipeline")]
60pub struct PipelineCLI {
61    /// Name of the pipeline this command applies to
62    #[arg(long, short, global = true, add = ArgValueCompleter::new(pipeline_name_completer))]
63    pub pipeline_name: Option<String>,
64
65    /// Subcommand to run
66    #[command(subcommand)]
67    pub subcommand: PipelineSubCommand,
68}
69
70/// Pipeline management subcommands
71#[derive(Debug, Clone, Parser)]
72#[command()]
73#[allow(clippy::large_enum_variant)]
74pub enum PipelineSubCommand {
75    /// Create a new pipeline
76    #[command(visible_aliases=&["n"])]
77    New(NewCLI),
78
79    /// Update the name and other attributes of a pipeline
80    #[command(visible_aliases=&["u"])]
81    Update(UpdateCLI),
82
83    /// Delete a pipeline
84    #[command(visible_aliases=&["D"])]
85    Delete(DeleteCLI),
86
87    /// Run a pipeline
88    #[command(visible_aliases=&["r"])]
89    Run(RunCLI),
90
91    /// List all pipelines
92    #[command(visible_aliases=&["l"])]
93    List,
94
95    /// Generate a Graphviz or mermaid diagram of the pipeline
96    #[command(visible_aliases=&["d"])]
97    Dag(DagCLI),
98
99    /// Export the pipeline to a YAML or JSON file to edit
100    #[command(visible_aliases=&["e"])]
101    Export(ExportCLI),
102
103    /// Import the pipeline from a file
104    #[command(visible_aliases=&["i"])]
105    Import(ImportCLI),
106
107    /// Step creation, dependency, output commands
108    #[command(visible_aliases=&["s"])]
109    Step(StepCLI),
110}
111
112impl UpdateFromXvcConfig for PipelineCLI {
113    fn update_from_conf(self, conf: &XvcConfig) -> XvcConfigResult<Box<Self>> {
114        let default_pipeline = XvcPipeline::from_conf(conf);
115        let name = Some(self.pipeline_name.clone().unwrap_or(default_pipeline.name));
116        Ok(Box::new(Self {
117            pipeline_name: name,
118            subcommand: self.subcommand.clone(),
119        }))
120    }
121}
122
123/// A pipeline is a collection of steps that are run in a specific order.
124/// This struct defines the name of it.
125#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, PartialOrd, Ord)]
126pub struct XvcPipeline {
127    /// The name of the pipeline, that's also the unique ID
128    pub name: String,
129}
130
131impl FromStr for XvcPipeline {
132    type Err = Error;
133    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
134        Ok(Self {
135            name: s.to_string(),
136        })
137    }
138}
139
140persist!(XvcPipeline, "xvc-pipeline");
141conf!(XvcPipeline, "pipeline.default");
142
143/// A pipeline run directory where the pipeline is run.
144/// It should be within the workspace to be portable across systems.
145#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize)]
146pub struct XvcPipelineRunDir {
147    /// The directory to run the command relative to xvc_root
148    pub run_dir: XvcPath,
149}
150
151persist!(XvcPipelineRunDir, "xvc-pipeline-run-dir");
152
153impl XvcPipeline {
154    /// Load a pipeline by name.
155    ///
156    /// Returns the entity and the pipeline if found. Otherwise returns [Error::NoPipelinesFound].
157    pub fn from_name(xvc_root: &XvcRoot, name: &str) -> Result<(XvcEntity, Self)> {
158        let all = xvc_root.load_store::<XvcPipeline>()?;
159        match all.iter().find(|(_, p)| p.name == name) {
160            None => Err(Error::NoPipelinesFound {
161                name: name.to_string(),
162            }),
163            Some((e, pipeline)) => Ok((*e, pipeline.to_owned())),
164        }
165    }
166}
167
168/// Initialize pipeline stores and save them.
169///
170/// This is to run during `xvc init`.
171pub fn init(xvc_root: &XvcRoot) -> Result<()> {
172    let conf = xvc_root.config();
173    let mut pipeline_store = XvcStore::<XvcPipeline>::new();
174    // If there is a system config for default pipeline name, adhere to it
175    let initial_name = if let Ok(config_opt) = conf.get_str("pipeline.default") {
176        config_opt.option
177    } else {
178        "default".to_string()
179    };
180
181    pipeline_store.insert(xvc_root.new_entity(), XvcPipeline { name: initial_name });
182
183    xvc_root.save_store(&pipeline_store)?;
184    xvc_root.save_store(&XvcStore::<XvcPipelineRunDir>::new())?;
185
186    xvc_root.save_store(&XvcStore::<XvcStep>::new())?;
187    xvc_root.save_store(&XvcStore::<XvcStepCommand>::new())?;
188    xvc_root.save_store(&XvcStore::<XvcDependency>::new())?;
189    xvc_root.save_store(&XvcStore::<XvcOutput>::new())?;
190    xvc_root.save_store(&XvcStore::<XvcStepInvalidate>::new())?;
191
192    Ok(())
193}
194
195/// Run `xvc pipeline` command.
196/// This is the entry point for the pipeline subcommand.
197/// It dispatches to the subcommands using [PipelineCLI] argument.
198pub fn cmd_pipeline<R: BufRead>(
199    input: R,
200    output_snd: &XvcOutputSender,
201    xvc_root: &XvcRoot,
202    command: PipelineCLI,
203) -> Result<()> {
204    let conf = xvc_root.config();
205    let command = command.update_from_conf(conf)?;
206    // This should already be filled from the conf if not given
207    let pipeline_name = command.pipeline_name.unwrap();
208    match command.subcommand {
209        PipelineSubCommand::Run(opts) => cmd_run(output_snd, xvc_root, &pipeline_name, opts),
210        PipelineSubCommand::New(opts) => cmd_new(xvc_root, &pipeline_name, opts),
211        PipelineSubCommand::Update(opts) => cmd_update(xvc_root, &pipeline_name, opts),
212        PipelineSubCommand::List => cmd_list(output_snd, xvc_root),
213        PipelineSubCommand::Delete(opts) => cmd_delete(xvc_root, &pipeline_name, opts),
214        PipelineSubCommand::Export(opts) => cmd_export(output_snd, xvc_root, &pipeline_name, opts),
215        PipelineSubCommand::Dag(opts) => cmd_dag(output_snd, xvc_root, &pipeline_name, opts),
216        PipelineSubCommand::Import(opts) => cmd_import(input, xvc_root, &pipeline_name, opts),
217        PipelineSubCommand::Step(step_cli) => {
218            handle_step_cli(output_snd, xvc_root, &pipeline_name, step_cli)
219        }
220    }
221}