use std::path::PathBuf;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipelineConfig {
pub name: Option<String>,
pub database: PathBuf,
pub source: Option<PathBuf>,
pub pattern: String,
pub partition: Option<String>,
pub output_dir: PathBuf,
pub target_schema: Option<PathBuf>,
pub llm: LlmPipelineConfig,
pub stages: Vec<PipelineStage>,
pub dry_run: bool,
pub resume: bool,
pub verbose: bool,
}
impl Default for PipelineConfig {
fn default() -> Self {
Self {
name: None,
database: PathBuf::from("staging.duckdb"),
source: None,
pattern: "*.json".to_string(),
partition: None,
output_dir: PathBuf::from("output"),
target_schema: None,
llm: LlmPipelineConfig::default(),
stages: Vec::new(),
dry_run: false,
resume: false,
verbose: false,
}
}
}
impl PipelineConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_database(mut self, path: impl Into<PathBuf>) -> Self {
self.database = path.into();
self
}
pub fn with_source(mut self, path: impl Into<PathBuf>) -> Self {
self.source = Some(path.into());
self
}
pub fn with_pattern(mut self, pattern: impl Into<String>) -> Self {
self.pattern = pattern.into();
self
}
pub fn with_partition(mut self, partition: impl Into<String>) -> Self {
self.partition = Some(partition.into());
self
}
pub fn with_output_dir(mut self, path: impl Into<PathBuf>) -> Self {
self.output_dir = path.into();
self
}
pub fn with_target_schema(mut self, path: impl Into<PathBuf>) -> Self {
self.target_schema = Some(path.into());
self
}
pub fn with_llm(mut self, llm: LlmPipelineConfig) -> Self {
self.llm = llm;
self
}
pub fn with_stages(mut self, stages: Vec<PipelineStage>) -> Self {
self.stages = stages;
self
}
pub fn with_dry_run(mut self, dry_run: bool) -> Self {
self.dry_run = dry_run;
self
}
pub fn with_resume(mut self, resume: bool) -> Self {
self.resume = resume;
self
}
pub fn with_verbose(mut self, verbose: bool) -> Self {
self.verbose = verbose;
self
}
pub fn effective_stages(&self) -> Vec<PipelineStage> {
if self.stages.is_empty() {
PipelineStage::all()
} else {
self.stages.clone()
}
}
pub fn should_run_stage(&self, stage: PipelineStage) -> bool {
if self.stages.is_empty() {
true
} else {
self.stages.contains(&stage)
}
}
pub fn validate(&self) -> Result<(), String> {
if self.should_run_stage(PipelineStage::Ingest) && self.source.is_none() {
return Err("Source path is required for ingest stage".to_string());
}
if self.should_run_stage(PipelineStage::Map) && self.target_schema.is_none() {
return Err("Target schema is required for map stage".to_string());
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LlmPipelineConfig {
pub mode: String,
pub ollama_url: String,
pub model: String,
pub model_path: Option<PathBuf>,
pub doc_path: Option<PathBuf>,
pub temperature: f32,
}
impl Default for LlmPipelineConfig {
fn default() -> Self {
Self {
mode: "none".to_string(),
ollama_url: "http://localhost:11434".to_string(),
model: "llama3.2".to_string(),
model_path: None,
doc_path: None,
temperature: 0.3,
}
}
}
impl LlmPipelineConfig {
pub fn is_enabled(&self) -> bool {
self.mode != "none"
}
pub fn online(model: impl Into<String>) -> Self {
Self {
mode: "online".to_string(),
model: model.into(),
..Default::default()
}
}
pub fn offline(model_path: impl Into<PathBuf>) -> Self {
Self {
mode: "offline".to_string(),
model_path: Some(model_path.into()),
..Default::default()
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum PipelineStage {
Ingest,
Infer,
Refine,
Map,
Export,
Generate,
}
impl PipelineStage {
pub fn all() -> Vec<Self> {
vec![
Self::Ingest,
Self::Infer,
Self::Refine,
Self::Map,
Self::Export,
Self::Generate,
]
}
pub fn name(&self) -> &'static str {
match self {
Self::Ingest => "ingest",
Self::Infer => "infer",
Self::Refine => "refine",
Self::Map => "map",
Self::Export => "export",
Self::Generate => "generate",
}
}
pub fn description(&self) -> &'static str {
match self {
Self::Ingest => "Ingest data into staging database",
Self::Infer => "Infer schema from staged data",
Self::Refine => "Refine schema with LLM",
Self::Map => "Map to target schema",
Self::Export => "Export to Parquet/target format",
Self::Generate => "Generate ODCS contracts",
}
}
pub fn index(&self) -> usize {
match self {
Self::Ingest => 1,
Self::Infer => 2,
Self::Refine => 3,
Self::Map => 4,
Self::Export => 5,
Self::Generate => 6,
}
}
pub fn is_optional(&self) -> bool {
matches!(self, Self::Refine | Self::Map)
}
}
impl std::fmt::Display for PipelineStage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())
}
}
impl std::str::FromStr for PipelineStage {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"ingest" | "1" => Ok(Self::Ingest),
"infer" | "2" => Ok(Self::Infer),
"refine" | "3" => Ok(Self::Refine),
"map" | "4" => Ok(Self::Map),
"export" | "5" => Ok(Self::Export),
"generate" | "6" => Ok(Self::Generate),
_ => Err(format!("Unknown stage: {}", s)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipeline_config_default() {
let config = PipelineConfig::default();
assert_eq!(config.database, PathBuf::from("staging.duckdb"));
assert_eq!(config.pattern, "*.json");
assert!(!config.dry_run);
}
#[test]
fn test_pipeline_config_builder() {
let config = PipelineConfig::new()
.with_database("test.duckdb")
.with_source("/data/input")
.with_output_dir("/data/output")
.with_dry_run(true);
assert_eq!(config.database, PathBuf::from("test.duckdb"));
assert_eq!(config.source, Some(PathBuf::from("/data/input")));
assert!(config.dry_run);
}
#[test]
fn test_effective_stages() {
let config = PipelineConfig::default();
assert_eq!(config.effective_stages().len(), 6);
let config = PipelineConfig::default()
.with_stages(vec![PipelineStage::Ingest, PipelineStage::Infer]);
assert_eq!(config.effective_stages().len(), 2);
}
#[test]
fn test_pipeline_stage_parse() {
assert_eq!(
"ingest".parse::<PipelineStage>().unwrap(),
PipelineStage::Ingest
);
assert_eq!("1".parse::<PipelineStage>().unwrap(), PipelineStage::Ingest);
assert_eq!(
"refine".parse::<PipelineStage>().unwrap(),
PipelineStage::Refine
);
assert!("invalid".parse::<PipelineStage>().is_err());
}
#[test]
fn test_pipeline_stage_properties() {
assert_eq!(PipelineStage::Ingest.index(), 1);
assert!(!PipelineStage::Ingest.is_optional());
assert!(PipelineStage::Refine.is_optional());
assert!(PipelineStage::Map.is_optional());
}
#[test]
fn test_llm_config() {
let config = LlmPipelineConfig::default();
assert!(!config.is_enabled());
let config = LlmPipelineConfig::online("llama3.2");
assert!(config.is_enabled());
assert_eq!(config.mode, "online");
}
#[test]
fn test_config_validation() {
let config = PipelineConfig::default();
assert!(config.validate().is_err());
let config = PipelineConfig::default()
.with_source("/data")
.with_stages(vec![PipelineStage::Ingest, PipelineStage::Infer]);
assert!(config.validate().is_ok());
let config = PipelineConfig::default()
.with_source("/data")
.with_stages(vec![PipelineStage::Map]);
assert!(config.validate().is_err());
}
}