use anyhow::Result;
use rusqlite::{params, OptionalExtension};
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::PathBuf;
use super::BioVaultDb;
use crate::pipeline_spec::PipelineSpec;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Pipeline {
pub id: i64,
pub name: String,
pub pipeline_path: String,
pub created_at: String,
pub updated_at: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub spec: Option<PipelineSpec>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PipelineRun {
pub id: i64,
pub pipeline_id: i64,
pub status: String,
pub work_dir: String,
pub results_dir: Option<String>,
pub created_at: String,
pub completed_at: Option<String>,
}
impl BioVaultDb {
pub fn list_pipelines(&self) -> Result<Vec<Pipeline>> {
let mut stmt = self.conn.prepare(
"SELECT id, name, pipeline_path, created_at, updated_at
FROM pipelines
ORDER BY created_at DESC",
)?;
let mut pipelines = stmt
.query_map([], |row| {
Ok(Pipeline {
id: row.get(0)?,
name: row.get(1)?,
pipeline_path: row.get(2)?,
created_at: row.get(3)?,
updated_at: row.get(4)?,
spec: None,
})
})?
.collect::<Result<Vec<_>, _>>()?;
for pipeline in &mut pipelines {
let yaml_path = PathBuf::from(&pipeline.pipeline_path).join("pipeline.yaml");
if yaml_path.exists() {
match fs::read_to_string(&yaml_path) {
Ok(content) => {
match serde_yaml::from_str::<PipelineSpec>(&content) {
Ok(spec) => {
pipeline.spec = Some(spec);
}
Err(e) => {
eprintln!("Warning: Failed to parse pipeline.yaml for '{}': {}", pipeline.name, e);
eprintln!(" Path: {}", yaml_path.display());
}
}
}
Err(e) => {
eprintln!("Warning: Failed to read pipeline.yaml for '{}': {}", pipeline.name, e);
}
}
}
}
Ok(pipelines)
}
pub fn get_pipeline(&self, pipeline_id: i64) -> Result<Option<Pipeline>> {
let mut stmt = self.conn.prepare(
"SELECT id, name, pipeline_path, created_at, updated_at
FROM pipelines
WHERE id = ?1",
)?;
let pipeline = stmt
.query_row([pipeline_id], |row| {
Ok(Pipeline {
id: row.get(0)?,
name: row.get(1)?,
pipeline_path: row.get(2)?,
created_at: row.get(3)?,
updated_at: row.get(4)?,
spec: None,
})
})
.optional()?;
if let Some(mut p) = pipeline {
let yaml_path = PathBuf::from(&p.pipeline_path).join("pipeline.yaml");
if yaml_path.exists() {
match fs::read_to_string(&yaml_path) {
Ok(content) => {
match serde_yaml::from_str::<PipelineSpec>(&content) {
Ok(spec) => {
p.spec = Some(spec);
}
Err(e) => {
eprintln!("Warning: Failed to parse pipeline.yaml: {}", e);
eprintln!(" Path: {}", yaml_path.display());
}
}
}
Err(e) => {
eprintln!("Warning: Failed to read pipeline.yaml: {}", e);
}
}
}
Ok(Some(p))
} else {
Ok(None)
}
}
pub fn register_pipeline(&self, name: &str, pipeline_path: &str) -> Result<i64> {
self.conn.execute(
"INSERT INTO pipelines (name, pipeline_path, created_at, updated_at)
VALUES (?1, ?2, datetime('now'), datetime('now'))",
params![name, pipeline_path],
)?;
Ok(self.conn.last_insert_rowid())
}
pub fn touch_pipeline(&self, pipeline_id: i64) -> Result<()> {
self.conn.execute(
"UPDATE pipelines SET updated_at = datetime('now') WHERE id = ?1",
params![pipeline_id],
)?;
Ok(())
}
pub fn delete_pipeline(&self, pipeline_id: i64) -> Result<()> {
self.conn
.execute("DELETE FROM pipelines WHERE id = ?1", params![pipeline_id])?;
Ok(())
}
pub fn list_pipeline_runs(&self) -> Result<Vec<PipelineRun>> {
let mut stmt = self.conn.prepare(
"SELECT id, pipeline_id, status, work_dir, results_dir, created_at, completed_at
FROM pipeline_runs
ORDER BY created_at DESC",
)?;
let runs = stmt
.query_map([], |row| {
Ok(PipelineRun {
id: row.get(0)?,
pipeline_id: row.get(1)?,
status: row.get(2)?,
work_dir: row.get(3)?,
results_dir: row.get(4)?,
created_at: row.get(5)?,
completed_at: row.get(6)?,
})
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(runs)
}
pub fn get_pipeline_run(&self, run_id: i64) -> Result<Option<PipelineRun>> {
let mut stmt = self.conn.prepare(
"SELECT id, pipeline_id, status, work_dir, results_dir, created_at, completed_at
FROM pipeline_runs
WHERE id = ?1",
)?;
let run = stmt
.query_row([run_id], |row| {
Ok(PipelineRun {
id: row.get(0)?,
pipeline_id: row.get(1)?,
status: row.get(2)?,
work_dir: row.get(3)?,
results_dir: row.get(4)?,
created_at: row.get(5)?,
completed_at: row.get(6)?,
})
})
.optional()?;
Ok(run)
}
pub fn create_pipeline_run(
&self,
pipeline_id: i64,
work_dir: &str,
results_dir: Option<&str>,
) -> Result<i64> {
self.conn.execute(
"INSERT INTO pipeline_runs (pipeline_id, status, work_dir, results_dir, created_at)
VALUES (?1, 'running', ?2, ?3, datetime('now'))",
params![pipeline_id, work_dir, results_dir],
)?;
Ok(self.conn.last_insert_rowid())
}
pub fn update_pipeline_run_status(
&self,
run_id: i64,
status: &str,
completed: bool,
) -> Result<()> {
if completed {
self.conn.execute(
"UPDATE pipeline_runs SET status = ?1, completed_at = datetime('now') WHERE id = ?2",
params![status, run_id],
)?;
} else {
self.conn.execute(
"UPDATE pipeline_runs SET status = ?1 WHERE id = ?2",
params![status, run_id],
)?;
}
Ok(())
}
pub fn delete_pipeline_run(&self, run_id: i64) -> Result<()> {
self.conn
.execute("DELETE FROM pipeline_runs WHERE id = ?1", params![run_id])?;
Ok(())
}
}