use super::pipeline_config::{PipelineConfig, ResolvedPipelineConfig};
use biors_core::workflow::SequenceWorkflowOutput;
use serde::Serialize;
use std::path::Path;
#[derive(Debug, Serialize)]
pub(crate) struct PipelineOutput {
pipeline: &'static str,
ready: bool,
#[serde(skip_serializing_if = "Option::is_none")]
dry_run: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
explain_plan: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
config: Option<PipelineConfigSummary>,
steps: Vec<PipelineStep>,
#[serde(skip_serializing_if = "Option::is_none")]
plan: Option<PipelinePlan>,
pub(crate) workflow: Option<SequenceWorkflowOutput>,
}
#[derive(Debug, Serialize)]
struct PipelineStep {
name: &'static str,
status: &'static str,
records: usize,
warning_count: usize,
error_count: usize,
#[serde(skip_serializing_if = "Option::is_none")]
output_sha256: Option<String>,
}
#[derive(Debug, Serialize)]
struct PipelineConfigSummary {
schema_version: String,
name: String,
input: String,
export: String,
}
#[derive(Debug, Clone, Serialize)]
struct PipelinePlan {
schema_version: String,
name: String,
input: String,
stages: Vec<PipelinePlanStage>,
}
#[derive(Debug, Clone, Serialize)]
struct PipelinePlanStage {
name: &'static str,
operation: &'static str,
detail: String,
}
#[derive(Debug, Clone, Copy)]
enum PipelineStage {
Parse,
Normalize,
Validate,
Tokenize,
Export,
}
const LEGACY_STAGES: &[PipelineStage] = &[
PipelineStage::Validate,
PipelineStage::Tokenize,
PipelineStage::Export,
];
const CONFIG_STAGES: &[PipelineStage] = &[
PipelineStage::Parse,
PipelineStage::Normalize,
PipelineStage::Validate,
PipelineStage::Tokenize,
PipelineStage::Export,
];
impl PipelineOutput {
pub(crate) fn from_workflow(workflow: SequenceWorkflowOutput) -> Self {
Self {
pipeline: "validate_tokenize_export.v0",
ready: workflow.model_ready,
dry_run: None,
explain_plan: None,
config: None,
steps: legacy_steps(&workflow),
plan: None,
workflow: Some(workflow),
}
}
pub(crate) fn dry_run(resolved: ResolvedPipelineConfig, explain_plan: bool) -> Self {
Self {
pipeline: "config_pipeline.v0",
ready: false,
dry_run: Some(true),
explain_plan: Some(explain_plan),
config: Some(PipelineConfigSummary::from_config(
&resolved.config,
&resolved.input_path,
)),
steps: planned_steps(),
plan: Some(pipeline_plan(&resolved)),
workflow: None,
}
}
pub(crate) fn from_config_workflow(
resolved: ResolvedPipelineConfig,
explain_plan: bool,
workflow: SequenceWorkflowOutput,
) -> Self {
let ready = workflow.model_ready;
Self {
pipeline: "config_pipeline.v0",
ready,
dry_run: Some(false),
explain_plan: Some(explain_plan),
config: Some(PipelineConfigSummary::from_config(
&resolved.config,
&resolved.input_path,
)),
steps: config_steps(&workflow),
plan: explain_plan.then(|| pipeline_plan(&resolved)),
workflow: Some(workflow),
}
}
}
impl PipelineConfigSummary {
fn from_config(config: &PipelineConfig, input_path: &Path) -> Self {
Self {
schema_version: config.schema_version.to_string(),
name: config.name.clone(),
input: input_path.display().to_string(),
export: config.export.format.clone(),
}
}
}
fn legacy_steps(workflow: &SequenceWorkflowOutput) -> Vec<PipelineStep> {
executed_steps(LEGACY_STAGES, workflow)
}
fn planned_steps() -> Vec<PipelineStep> {
CONFIG_STAGES
.iter()
.map(|stage| stage.planned_step())
.collect()
}
fn config_steps(workflow: &SequenceWorkflowOutput) -> Vec<PipelineStep> {
executed_steps(CONFIG_STAGES, workflow)
}
fn executed_steps(
stages: &[PipelineStage],
workflow: &SequenceWorkflowOutput,
) -> Vec<PipelineStep> {
stages
.iter()
.map(|stage| stage.executed_step(workflow))
.collect()
}
fn status_from_errors(error_count: usize) -> &'static str {
if error_count == 0 {
"passed"
} else {
"failed"
}
}
impl PipelineStage {
fn name(self) -> &'static str {
match self {
Self::Parse => "parse",
Self::Normalize => "normalize",
Self::Validate => "validate",
Self::Tokenize => "tokenize",
Self::Export => "export",
}
}
fn operation(self) -> &'static str {
match self {
Self::Parse => "parse FASTA input",
Self::Normalize => "normalize sequence records",
Self::Validate => "validate biological alphabet",
Self::Tokenize => "tokenize normalized records",
Self::Export => "export model-ready JSON",
}
}
fn plan_detail(self, resolved: &ResolvedPipelineConfig) -> String {
match self {
Self::Parse => format!("read {}", resolved.input_path.display()),
Self::Normalize => resolved.config.normalize.policy.clone(),
Self::Validate => resolved.config.validate.kind.clone(),
Self::Tokenize => resolved.config.tokenize.profile.clone(),
Self::Export => format!(
"max_length={}, padding={}",
resolved.config.export.max_length, resolved.config.export.padding
),
}
}
fn planned_step(self) -> PipelineStep {
PipelineStep {
name: self.name(),
status: "planned",
records: 0,
warning_count: 0,
error_count: 0,
output_sha256: None,
}
}
fn executed_step(self, workflow: &SequenceWorkflowOutput) -> PipelineStep {
match self {
Self::Parse | Self::Normalize => PipelineStep {
name: self.name(),
status: "passed",
records: workflow.validation.records,
warning_count: 0,
error_count: 0,
output_sha256: None,
},
Self::Validate => PipelineStep {
name: self.name(),
status: status_from_errors(workflow.validation.error_count),
records: workflow.validation.records,
warning_count: workflow.validation.warning_count,
error_count: workflow.validation.error_count,
output_sha256: None,
},
Self::Tokenize => {
let summary = &workflow.tokenization.summary;
PipelineStep {
name: self.name(),
status: status_from_errors(summary.error_count),
records: summary.records,
warning_count: summary.warning_count,
error_count: summary.error_count,
output_sha256: None,
}
}
Self::Export => export_step(workflow),
}
}
fn plan_stage(self, resolved: &ResolvedPipelineConfig) -> PipelinePlanStage {
PipelinePlanStage {
name: self.name(),
operation: self.operation(),
detail: self.plan_detail(resolved),
}
}
}
fn export_step(workflow: &SequenceWorkflowOutput) -> PipelineStep {
PipelineStep {
name: PipelineStage::Export.name(),
status: if workflow.model_ready {
"passed"
} else {
"blocked"
},
records: workflow
.model_input
.as_ref()
.map(|input| input.records.len())
.unwrap_or(0),
warning_count: 0,
error_count: if workflow.model_ready {
0
} else {
workflow.readiness_issues.len()
},
output_sha256: workflow
.model_ready
.then(|| workflow.provenance.hashes.output_data_sha256.clone()),
}
}
fn pipeline_plan(resolved: &ResolvedPipelineConfig) -> PipelinePlan {
PipelinePlan {
schema_version: resolved.config.schema_version.to_string(),
name: resolved.config.name.clone(),
input: resolved.input_path.display().to_string(),
stages: CONFIG_STAGES
.iter()
.map(|stage| stage.plan_stage(resolved))
.collect(),
}
}