use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::path::Path;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipelineState {
pub schema_version: u32,
pub design_doc: String,
pub doc_hash: String,
pub stage: String,
#[serde(default)]
pub plans: Vec<PlanRecord>,
#[serde(default)]
pub runs: Vec<RunRecord>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlanRecord {
pub agent_id: String,
pub worktree: String,
pub started_at: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub completed_at: Option<String>,
pub status: String,
#[serde(default)]
pub blocking_gaps: u32,
#[serde(default)]
pub advisory_gaps: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub plan_file: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RunRecord {
pub agent_id: String,
pub worktree: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub issue_id: Option<i64>,
pub started_at: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub completed_at: Option<String>,
pub status: String,
}
pub fn compute_doc_hash(content: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(content.as_bytes());
let result = hasher.finalize();
format!("sha256:{result:x}")
}
pub fn is_plan_stale(pipeline: &PipelineState, design_doc_path: &Path) -> bool {
let Ok(content) = std::fs::read_to_string(design_doc_path) else {
return false; };
let current_hash = compute_doc_hash(&content);
current_hash != pipeline.doc_hash
}
pub fn pipeline_path_for_doc(doc_path: &Path) -> std::path::PathBuf {
let stem = doc_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown");
doc_path.with_file_name(format!("{stem}.pipeline.json"))
}
pub fn plan_path_for_doc(doc_path: &Path) -> std::path::PathBuf {
let stem = doc_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown");
doc_path.with_file_name(format!("{stem}.plan.json"))
}
pub fn read_pipeline_state(doc_path: &Path) -> Option<PipelineState> {
let path = pipeline_path_for_doc(doc_path);
let content = std::fs::read_to_string(&path).ok()?;
serde_json::from_str(&content).ok()
}
pub fn write_pipeline_state(doc_path: &Path, state: &PipelineState) -> Result<()> {
let path = pipeline_path_for_doc(doc_path);
let json = serde_json::to_string_pretty(state).context("Failed to serialize pipeline state")?;
std::fs::write(&path, json)
.with_context(|| format!("Failed to write pipeline state to {}", path.display()))
}
pub fn create_initial_pipeline(doc_path: &Path) -> Result<PipelineState> {
let content = std::fs::read_to_string(doc_path)
.with_context(|| format!("Failed to read design doc: {}", doc_path.display()))?;
let doc_hash = compute_doc_hash(&content);
let state = PipelineState {
schema_version: 1,
design_doc: doc_path.to_string_lossy().to_string(),
doc_hash,
stage: "designed".to_string(),
plans: Vec::new(),
runs: Vec::new(),
};
write_pipeline_state(doc_path, &state)?;
Ok(state)
}
pub fn ensure_pipeline_state(doc_path: &Path) -> Result<PipelineState> {
read_pipeline_state(doc_path).map_or_else(|| create_initial_pipeline(doc_path), Ok)
}
pub fn mark_planning(doc_path: &Path, agent_id: &str, worktree: &str) -> Result<PipelineState> {
let mut state = ensure_pipeline_state(doc_path)?;
if let Ok(content) = std::fs::read_to_string(doc_path) {
state.doc_hash = compute_doc_hash(&content);
}
state.stage = "planning".to_string();
state.plans.push(PlanRecord {
agent_id: agent_id.to_string(),
worktree: worktree.to_string(),
started_at: chrono::Utc::now().to_rfc3339(),
completed_at: None,
status: "running".to_string(),
blocking_gaps: 0,
advisory_gaps: 0,
plan_file: None,
});
write_pipeline_state(doc_path, &state)?;
Ok(state)
}
#[allow(dead_code)]
pub fn mark_planned(
doc_path: &Path,
agent_id: &str,
blocking_gaps: u32,
advisory_gaps: u32,
plan_file: &str,
) -> Result<PipelineState> {
let mut state = ensure_pipeline_state(doc_path)?;
state.stage = "planned".to_string();
if let Some(plan) = state
.plans
.iter_mut()
.rev()
.find(|p| p.agent_id == agent_id)
{
plan.completed_at = Some(chrono::Utc::now().to_rfc3339());
plan.status = "done".to_string();
plan.blocking_gaps = blocking_gaps;
plan.advisory_gaps = advisory_gaps;
plan.plan_file = Some(plan_file.to_string());
}
write_pipeline_state(doc_path, &state)?;
Ok(state)
}
pub fn mark_running(
doc_path: &Path,
agent_id: &str,
worktree: &str,
issue_id: Option<i64>,
) -> Result<PipelineState> {
let mut state = ensure_pipeline_state(doc_path)?;
state.stage = "running".to_string();
state.runs.push(RunRecord {
agent_id: agent_id.to_string(),
worktree: worktree.to_string(),
issue_id,
started_at: chrono::Utc::now().to_rfc3339(),
completed_at: None,
status: "running".to_string(),
});
write_pipeline_state(doc_path, &state)?;
Ok(state)
}
pub fn stage_display(pipeline: &PipelineState, doc_path: &Path) -> String {
let stale = if pipeline.stage == "planned" && is_plan_stale(pipeline, doc_path) {
" \u{26a0} stale"
} else {
""
};
match pipeline.stage.as_str() {
"designed" => "designed".to_string(),
"planning" => pipeline.plans.last().map_or_else(
|| "planning \u{27f3}".to_string(),
|plan| format!("planning \u{27f3} {}", plan.agent_id),
),
"planned" => pipeline.plans.last().map_or_else(
|| format!("planned{stale}"),
|plan| {
format!(
"planned \u{2713}{} {}/{}",
stale, plan.blocking_gaps, plan.advisory_gaps
)
},
),
"running" => pipeline.runs.last().map_or_else(
|| "running \u{27f3}".to_string(),
|run| format!("running {} \u{27f3}", run.agent_id),
),
"complete" => "complete \u{2713}".to_string(),
other => other.to_string(),
}
}
#[allow(dead_code)]
fn plan_age_display(completed_at: &Option<String>) -> String {
let Some(ts) = completed_at else {
return String::new();
};
let Ok(dt) = chrono::DateTime::parse_from_rfc3339(ts) else {
return String::new();
};
let elapsed = chrono::Utc::now().signed_duration_since(dt.with_timezone(&chrono::Utc));
let mins = elapsed.num_minutes();
if mins < 60 {
format!("({mins}m ago)")
} else {
let hours = mins / 60;
format!("({hours}h ago)")
}
}
pub fn scan_pipeline_states(repo_root: &Path) -> Vec<(std::path::PathBuf, PipelineState)> {
let design_dir = repo_root.join(".design");
if !design_dir.is_dir() {
return Vec::new();
}
let mut results = Vec::new();
if let Ok(entries) = std::fs::read_dir(&design_dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) == Some("md") {
if let Some(state) = read_pipeline_state(&path) {
results.push((path, state));
}
}
}
}
results
}
pub fn scan_design_docs(repo_root: &Path) -> Vec<std::path::PathBuf> {
let design_dir = repo_root.join(".design");
if !design_dir.is_dir() {
return Vec::new();
}
let mut docs = Vec::new();
if let Ok(entries) = std::fs::read_dir(&design_dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) == Some("md") {
docs.push(path);
}
}
}
docs.sort();
docs
}