Skip to main content

construct/gateway/
api_workflows.rs

1//! REST API handlers for workflow management (`/api/workflows`).
2//!
3//! Each workflow definition is a Kumiho item of kind `"workflow"` in the
4//! `Construct/Workflows` space.  The YAML definition and metadata (description,
5//! version, tags, steps count) are stored as revision metadata.
6//!
7//! Provides:
8//!   - `GET    /api/workflows`              — list workflow definitions
9//!   - `POST   /api/workflows`              — create a new workflow
10//!   - `PUT    /api/workflows/{*kref}`      — update an existing workflow
11//!   - `DELETE /api/workflows/{*kref}`       — delete a workflow
12//!   - `POST   /api/workflows/deprecate`    — toggle deprecation
13//!   - `GET    /api/workflows/runs`         — recent workflow runs (from Kumiho)
14//!   - `GET    /api/workflows/runs/{id}`    — single run detail
15//!   - `GET    /api/workflows/dashboard`    — aggregated stats
16
17use super::AppState;
18use super::api::require_auth;
19use super::api_agents::build_kumiho_client;
20use super::kumiho_client::{ItemResponse, KumihoClient, KumihoError, RevisionResponse, slugify};
21use axum::{
22    extract::{Path, Query, State},
23    http::{HeaderMap, StatusCode},
24    response::{IntoResponse, Json},
25};
26use parking_lot::Mutex;
27use serde::{Deserialize, Serialize};
28use std::collections::HashMap;
29use std::sync::OnceLock;
30use std::time::Instant;
31
32const WORKFLOW_SPACE_NAME: &str = "Workflows";
33const WORKFLOW_RUNS_SPACE_NAME: &str = "WorkflowRuns";
34const WORKFLOW_RUN_REQUESTS_SPACE_NAME: &str = "WorkflowRunRequests";
35
36fn workflow_project(state: &AppState) -> String {
37    state.config.lock().kumiho.harness_project.clone()
38}
39
40fn workflow_space_path(state: &AppState) -> String {
41    format!("/{}/{}", workflow_project(state), WORKFLOW_SPACE_NAME)
42}
43
44fn workflow_runs_space_path(state: &AppState) -> String {
45    format!("/{}/{}", workflow_project(state), WORKFLOW_RUNS_SPACE_NAME)
46}
47
48fn workflow_run_requests_space_path(state: &AppState) -> String {
49    format!(
50        "/{}/{}",
51        workflow_project(state),
52        WORKFLOW_RUN_REQUESTS_SPACE_NAME
53    )
54}
55
56// ── Response cache ──────────────────────────────────────────────────────
57
58struct WorkflowCache {
59    workflows: Vec<WorkflowResponse>,
60    include_deprecated: bool,
61    fetched_at: Instant,
62}
63
64static WORKFLOW_CACHE: OnceLock<Mutex<Option<WorkflowCache>>> = OnceLock::new();
65const CACHE_TTL_SECS: u64 = 3;
66
67fn get_cached(include_deprecated: bool) -> Option<Vec<WorkflowResponse>> {
68    let lock = WORKFLOW_CACHE.get_or_init(|| Mutex::new(None));
69    let cache = lock.lock();
70    if let Some(ref c) = *cache {
71        if c.include_deprecated == include_deprecated
72            && c.fetched_at.elapsed().as_secs() < CACHE_TTL_SECS
73        {
74            return Some(c.workflows.clone());
75        }
76    }
77    None
78}
79
80fn set_cached(workflows: &[WorkflowResponse], include_deprecated: bool) {
81    let lock = WORKFLOW_CACHE.get_or_init(|| Mutex::new(None));
82    let mut cache = lock.lock();
83    *cache = Some(WorkflowCache {
84        workflows: workflows.to_vec(),
85        include_deprecated,
86        fetched_at: Instant::now(),
87    });
88}
89
90fn invalidate_cache() {
91    if let Some(lock) = WORKFLOW_CACHE.get() {
92        let mut cache = lock.lock();
93        // Mark as expired but keep stale data for fallback on API errors
94        if let Some(ref mut c) = *cache {
95            c.fetched_at = Instant::now() - std::time::Duration::from_secs(CACHE_TTL_SECS + 1);
96        }
97    }
98}
99
100// ── Query / request types ───────────────────────────────────────────────
101
102#[derive(Deserialize)]
103pub struct WorkflowListQuery {
104    #[serde(default)]
105    pub include_deprecated: bool,
106    pub q: Option<String>,
107}
108
109#[derive(Deserialize)]
110pub struct CreateWorkflowBody {
111    pub name: String,
112    pub description: String,
113    pub definition: String,
114    #[serde(default)]
115    pub version: Option<String>,
116    #[serde(default)]
117    pub tags: Option<Vec<String>>,
118}
119
120#[derive(Deserialize)]
121pub struct DeprecateBody {
122    pub kref: String,
123    pub deprecated: bool,
124}
125
126#[derive(Deserialize)]
127pub struct WorkflowRunsQuery {
128    #[serde(default = "default_limit")]
129    pub limit: usize,
130    #[serde(default)]
131    pub workflow: Option<String>,
132}
133
134fn default_limit() -> usize {
135    20
136}
137
138#[derive(Deserialize)]
139pub struct RunWorkflowBody {
140    #[serde(default)]
141    pub inputs: serde_json::Value,
142    #[serde(default)]
143    pub cwd: Option<String>,
144}
145
146// ── Response types ──────────────────────────────────────────────────────
147
148#[derive(Serialize, Clone)]
149pub struct WorkflowResponse {
150    pub kref: String,
151    pub name: String,
152    pub item_name: String,
153    pub deprecated: bool,
154    pub created_at: Option<String>,
155    pub description: String,
156    pub definition: String,
157    pub version: String,
158    pub tags: Vec<String>,
159    pub steps: usize,
160    pub revision_number: i32,
161    /// `"builtin"` — shipped with Construct, not yet customized.
162    /// `"builtin-modified"` — builtin overridden by a Kumiho copy.
163    /// `"custom"` — user-created workflow.
164    #[serde(default = "default_source")]
165    pub source: String,
166    #[serde(skip_serializing_if = "Vec::is_empty")]
167    pub triggers: Vec<WorkflowTrigger>,
168}
169
170fn default_source() -> String {
171    "custom".to_string()
172}
173
174#[derive(Serialize, Clone, Debug)]
175pub struct WorkflowTrigger {
176    pub on_kind: String,
177    pub on_tag: String,
178    #[serde(skip_serializing_if = "String::is_empty")]
179    pub on_name_pattern: String,
180}
181
182#[derive(Serialize, Clone)]
183pub struct WorkflowRunSummary {
184    pub kref: String,
185    pub run_id: String,
186    pub workflow_name: String,
187    pub status: String,
188    pub started_at: String,
189    pub completed_at: String,
190    pub steps_completed: String,
191    pub steps_total: String,
192    pub error: String,
193    /// Kumiho item kref of the workflow definition this run used.
194    /// Empty for built-in / disk-fallback workflows.
195    #[serde(default, skip_serializing_if = "String::is_empty")]
196    pub workflow_item_kref: String,
197    /// Kumiho revision kref of the exact workflow YAML this run executed.
198    /// The dashboard DAG viewer fetches this revision so the rendered graph
199    /// always matches what the run actually ran — independent of later retags.
200    #[serde(default, skip_serializing_if = "String::is_empty")]
201    pub workflow_revision_kref: String,
202}
203
204#[derive(Serialize, Clone)]
205pub struct TranscriptEntry {
206    pub speaker: String,
207    pub content: String,
208    pub round: u32,
209}
210
211#[derive(Serialize, Clone, Default)]
212pub struct ApprovalOutputData {
213    #[serde(skip_serializing_if = "Option::is_none")]
214    pub awaiting_approval: Option<bool>,
215    #[serde(skip_serializing_if = "Option::is_none")]
216    pub approval_message: Option<String>,
217    #[serde(skip_serializing_if = "Vec::is_empty")]
218    pub approve_keywords: Vec<String>,
219    #[serde(skip_serializing_if = "Vec::is_empty")]
220    pub reject_keywords: Vec<String>,
221}
222
223#[derive(Serialize, Clone)]
224pub struct WorkflowStepDetail {
225    pub step_id: String,
226    pub status: String,
227    #[serde(skip_serializing_if = "String::is_empty")]
228    pub agent_id: String,
229    #[serde(skip_serializing_if = "String::is_empty")]
230    pub agent_type: String,
231    #[serde(skip_serializing_if = "String::is_empty")]
232    pub role: String,
233    #[serde(skip_serializing_if = "String::is_empty")]
234    pub template_name: String,
235    #[serde(skip_serializing_if = "String::is_empty")]
236    pub output_preview: String,
237    #[serde(skip_serializing_if = "Vec::is_empty")]
238    pub skills: Vec<String>,
239    #[serde(skip_serializing_if = "Vec::is_empty")]
240    pub transcript: Vec<TranscriptEntry>,
241    #[serde(skip_serializing_if = "Option::is_none")]
242    pub output_data: Option<ApprovalOutputData>,
243}
244
245#[derive(Serialize, Clone)]
246pub struct WorkflowRunDetail {
247    #[serde(flatten)]
248    pub summary: WorkflowRunSummary,
249    pub steps: Vec<WorkflowStepDetail>,
250}
251
252#[derive(Serialize)]
253pub struct WorkflowDashboard {
254    pub definitions_count: usize,
255    pub definitions: Vec<WorkflowResponse>,
256    pub active_runs: usize,
257    pub recent_runs: Vec<WorkflowRunSummary>,
258    pub total_runs: usize,
259}
260
261// ── Helpers ─────────────────────────────────────────────────────────────
262
263fn kumiho_err(e: KumihoError) -> (StatusCode, Json<serde_json::Value>) {
264    match &e {
265        KumihoError::Unreachable(_) => (
266            StatusCode::SERVICE_UNAVAILABLE,
267            Json(serde_json::json!({ "error": format!("Kumiho service unavailable: {e}") })),
268        ),
269        KumihoError::Api { status, body } => {
270            let code = if *status == 401 || *status == 403 {
271                StatusCode::BAD_GATEWAY
272            } else {
273                StatusCode::from_u16(*status).unwrap_or(StatusCode::BAD_GATEWAY)
274            };
275            (
276                code,
277                Json(serde_json::json!({ "error": format!("Kumiho upstream: {body}") })),
278            )
279        }
280        KumihoError::Decode(msg) => (
281            StatusCode::BAD_GATEWAY,
282            Json(serde_json::json!({ "error": format!("Bad response from Kumiho: {msg}") })),
283        ),
284    }
285}
286
287fn workflow_metadata(body: &CreateWorkflowBody) -> HashMap<String, String> {
288    let mut meta = HashMap::new();
289    meta.insert("display_name".to_string(), body.name.clone());
290    meta.insert("description".to_string(), body.description.clone());
291    meta.insert("definition".to_string(), body.definition.clone());
292    meta.insert("created_by".to_string(), "construct-dashboard".to_string());
293    // Count steps in the YAML
294    let steps = count_yaml_steps(&body.definition);
295    meta.insert("steps".to_string(), steps.to_string());
296    if let Some(ref tags) = body.tags {
297        if !tags.is_empty() {
298            meta.insert("tags".to_string(), tags.join(","));
299        }
300    }
301    // Full-text search index
302    meta.insert(
303        "_search_text".to_string(),
304        format!("{} {}", body.name, body.description),
305    );
306    meta
307}
308
309fn count_yaml_steps(content: &str) -> usize {
310    let mut count = 0;
311    let mut in_steps = false;
312    for line in content.lines() {
313        let trimmed = line.trim();
314        if trimmed == "steps:" || trimmed == "tasks:" {
315            in_steps = true;
316            continue;
317        }
318        if in_steps {
319            if trimmed.starts_with("- id:") {
320                count += 1;
321            }
322            if !trimmed.is_empty()
323                && !trimmed.starts_with('-')
324                && !trimmed.starts_with(' ')
325                && !trimmed.starts_with('#')
326                && !line.starts_with(' ')
327            {
328                break;
329            }
330        }
331    }
332    count
333}
334
335fn to_workflow_response(item: &ItemResponse, rev: Option<&RevisionResponse>) -> WorkflowResponse {
336    let meta = rev.map(|r| &r.metadata);
337    let get = |key: &str| -> String { meta.and_then(|m| m.get(key)).cloned().unwrap_or_default() };
338    let tags_str = get("tags");
339    let tags: Vec<String> = if tags_str.is_empty() {
340        Vec::new()
341    } else {
342        tags_str.split(',').map(|s| s.trim().to_string()).collect()
343    };
344    let steps: usize = get("steps").parse().unwrap_or(0);
345
346    let display_name = {
347        let n = get("display_name");
348        if n.is_empty() {
349            item.item_name.clone()
350        } else {
351            n
352        }
353    };
354
355    let definition = get("definition");
356    let triggers = extract_triggers(&definition);
357
358    WorkflowResponse {
359        kref: item.kref.clone(),
360        name: display_name,
361        item_name: item.item_name.clone(),
362        deprecated: item.deprecated,
363        created_at: item.created_at.clone(),
364        description: get("description"),
365        definition,
366        version: format!("{}", rev.map(|r| r.number).unwrap_or(0)),
367        tags,
368        steps,
369        revision_number: rev.map(|r| r.number).unwrap_or(0),
370        source: "custom".to_string(),
371        triggers,
372    }
373}
374
375/// Prefer the `workflow.yaml` artifact on disk as the canonical definition,
376/// falling back to inline `definition` metadata only when no artifact exists.
377///
378/// The inline `definition` metadata is a legacy gateway-authored field that
379/// drifts from the artifact for operator-authored revisions and can also be
380/// truncated by Kumiho's batch endpoint for large YAMLs. The artifact file is
381/// the source of truth, so we always overwrite metadata with it when present.
382async fn prefer_artifact_definitions(
383    client: &super::kumiho_client::KumihoClient,
384    revs: &mut HashMap<String, RevisionResponse>,
385) {
386    for rev in revs.values_mut() {
387        if let Ok(artifact) = client
388            .get_artifact_by_name(&rev.kref, "workflow.yaml")
389            .await
390        {
391            let path = artifact
392                .location
393                .strip_prefix("file://")
394                .unwrap_or(&artifact.location);
395            if let Ok(yaml) = tokio::fs::read_to_string(path).await {
396                rev.metadata.insert("definition".to_string(), yaml);
397            }
398        }
399    }
400}
401
402async fn enrich_items(
403    client: &super::kumiho_client::KumihoClient,
404    items: Vec<ItemResponse>,
405) -> Vec<WorkflowResponse> {
406    // Only include items with kind == "workflow" — filter out stray items
407    // that agents may have created in the Workflows space.
408    let items: Vec<ItemResponse> = items.into_iter().filter(|i| i.kind == "workflow").collect();
409
410    if items.is_empty() {
411        return Vec::new();
412    }
413
414    let krefs: Vec<String> = items.iter().map(|i| i.kref.clone()).collect();
415
416    if let Ok(mut rev_map) = client.batch_get_revisions(&krefs, "published").await {
417        let missing: Vec<String> = krefs
418            .iter()
419            .filter(|k| !rev_map.contains_key(*k))
420            .cloned()
421            .collect();
422        let mut latest_map = if !missing.is_empty() {
423            client
424                .batch_get_revisions(&missing, "latest")
425                .await
426                .unwrap_or_default()
427        } else {
428            HashMap::new()
429        };
430
431        // Artifact-first: the `workflow.yaml` on disk is canonical. The inline
432        // `definition` metadata drifts for operator-authored revisions and is
433        // truncated by Kumiho's batch endpoint for large YAMLs, so we always
434        // prefer the artifact when it exists — same logic the single-revision
435        // endpoint uses.
436        prefer_artifact_definitions(client, &mut rev_map).await;
437        prefer_artifact_definitions(client, &mut latest_map).await;
438
439        return items
440            .iter()
441            .map(|item| {
442                let rev = rev_map
443                    .get(&item.kref)
444                    .or_else(|| latest_map.get(&item.kref));
445                to_workflow_response(item, rev)
446            })
447            .collect();
448    }
449
450    // Fallback: sequential
451    let mut workflows = Vec::with_capacity(items.len());
452    for item in &items {
453        let rev = client.get_published_or_latest(&item.kref).await.ok();
454        workflows.push(to_workflow_response(item, rev.as_ref()));
455    }
456    workflows
457}
458
459fn to_run_summary(item: &ItemResponse, rev: Option<&RevisionResponse>) -> WorkflowRunSummary {
460    let meta = rev.map(|r| &r.metadata);
461    let get = |key: &str| -> String { meta.and_then(|m| m.get(key)).cloned().unwrap_or_default() };
462
463    let run_id_meta = get("run_id");
464    WorkflowRunSummary {
465        kref: item.kref.clone(),
466        run_id: if run_id_meta.is_empty() {
467            item.item_name.clone()
468        } else {
469            run_id_meta
470        },
471        workflow_name: {
472            let wn = get("workflow_name");
473            if wn.is_empty() { get("workflow") } else { wn }
474        },
475        status: get("status"),
476        started_at: get("started_at"),
477        completed_at: get("completed_at"),
478        steps_completed: get("steps_completed"),
479        steps_total: get("steps_total"),
480        error: get("error"),
481        workflow_item_kref: get("workflow_item_kref"),
482        workflow_revision_kref: get("workflow_revision_kref"),
483    }
484}
485
486fn extract_steps_from_metadata(meta: &HashMap<String, String>) -> Vec<WorkflowStepDetail> {
487    // Skip known non-step metadata keys that happen to start with "step_"
488    const SKIP_KEYS: &[&str] = &["step_count", "steps_completed", "steps_total"];
489
490    let mut steps = Vec::new();
491    for (key, value) in meta {
492        if SKIP_KEYS.contains(&key.as_str()) {
493            continue;
494        }
495        if let Some(step_id) = key.strip_prefix("step_") {
496            // Value should be JSON object: {"status":"completed","output_preview":"...","agent_id":"..."}
497            // Legacy runs may have truncated JSON — fall back to regex extraction.
498            if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(value) {
499                // Only accept JSON objects (skip plain numbers/strings)
500                if !parsed.is_object() {
501                    continue;
502                }
503                let skills = parsed
504                    .get("skills")
505                    .and_then(|v| v.as_array())
506                    .map(|arr| {
507                        arr.iter()
508                            .filter_map(|s| s.as_str().map(|s| s.to_string()))
509                            .collect::<Vec<_>>()
510                    })
511                    .unwrap_or_default();
512                // Group chat transcript — stored as JSON string of array
513                let transcript = parsed
514                    .get("transcript")
515                    .and_then(|v| v.as_str())
516                    .and_then(|s| serde_json::from_str::<Vec<serde_json::Value>>(s).ok())
517                    .map(|arr| {
518                        arr.iter()
519                            .map(|entry| TranscriptEntry {
520                                speaker: entry
521                                    .get("speaker")
522                                    .and_then(|v| v.as_str())
523                                    .unwrap_or("?")
524                                    .to_string(),
525                                content: entry
526                                    .get("content")
527                                    .and_then(|v| v.as_str())
528                                    .unwrap_or("")
529                                    .to_string(),
530                                round: entry.get("round").and_then(|v| v.as_u64()).unwrap_or(0)
531                                    as u32,
532                            })
533                            .collect::<Vec<_>>()
534                    })
535                    .unwrap_or_default();
536                // Decode output_data for approval steps
537                let output_data = parsed.get("output_data").and_then(|v| {
538                    // output_data may be a JSON string or an embedded object
539                    let obj = if let Some(s) = v.as_str() {
540                        serde_json::from_str::<serde_json::Value>(s).ok()
541                    } else {
542                        Some(v.clone())
543                    };
544                    obj.map(|o| ApprovalOutputData {
545                        awaiting_approval: o.get("awaiting_approval").and_then(|v| v.as_bool()),
546                        approval_message: o
547                            .get("approval_message")
548                            .and_then(|v| v.as_str())
549                            .map(String::from),
550                        approve_keywords: o
551                            .get("approve_keywords")
552                            .and_then(|v| v.as_array())
553                            .map(|arr| {
554                                arr.iter()
555                                    .filter_map(|s| s.as_str().map(String::from))
556                                    .collect()
557                            })
558                            .unwrap_or_default(),
559                        reject_keywords: o
560                            .get("reject_keywords")
561                            .and_then(|v| v.as_array())
562                            .map(|arr| {
563                                arr.iter()
564                                    .filter_map(|s| s.as_str().map(String::from))
565                                    .collect()
566                            })
567                            .unwrap_or_default(),
568                    })
569                });
570                steps.push(WorkflowStepDetail {
571                    step_id: step_id.to_string(),
572                    status: parsed
573                        .get("status")
574                        .and_then(|v| v.as_str())
575                        .unwrap_or("unknown")
576                        .to_string(),
577                    agent_id: parsed
578                        .get("agent_id")
579                        .and_then(|v| v.as_str())
580                        .unwrap_or("")
581                        .to_string(),
582                    agent_type: parsed
583                        .get("agent_type")
584                        .and_then(|v| v.as_str())
585                        .unwrap_or("")
586                        .to_string(),
587                    role: parsed
588                        .get("role")
589                        .and_then(|v| v.as_str())
590                        .unwrap_or("")
591                        .to_string(),
592                    template_name: parsed
593                        .get("template_name")
594                        .and_then(|v| v.as_str())
595                        .unwrap_or("")
596                        .to_string(),
597                    output_preview: parsed
598                        .get("output_preview")
599                        .and_then(|v| v.as_str())
600                        .unwrap_or("")
601                        .to_string(),
602                    skills,
603                    transcript,
604                    output_data,
605                });
606            } else if value.contains(r#""status""#) {
607                // Truncated JSON fallback: extract status with simple string search
608                let status = if let Some(start) = value.find(r#""status": ""#) {
609                    let rest = &value[start + 11..];
610                    rest.split('"').next().unwrap_or("unknown")
611                } else {
612                    "unknown"
613                };
614                steps.push(WorkflowStepDetail {
615                    step_id: step_id.to_string(),
616                    status: status.to_string(),
617                    agent_id: String::new(),
618                    agent_type: String::new(),
619                    role: String::new(),
620                    template_name: String::new(),
621                    output_preview: String::new(),
622                    skills: Vec::new(),
623                    transcript: Vec::new(),
624                    output_data: None,
625                });
626            }
627        }
628    }
629    steps
630}
631
632fn to_run_detail(item: &ItemResponse, rev: Option<&RevisionResponse>) -> WorkflowRunDetail {
633    let summary = to_run_summary(item, rev);
634    let steps = rev
635        .map(|r| extract_steps_from_metadata(&r.metadata))
636        .unwrap_or_default();
637    WorkflowRunDetail { summary, steps }
638}
639
640// ── Builtin workflow discovery ──────────────────────────────────────────
641
642/// Default directory containing builtin workflow YAML files.
643const BUILTIN_WORKFLOWS_DIR: &str = ".construct/operator_mcp/workflow/builtins";
644
645/// Discover builtin workflow YAML files from `~/BUILTIN_WORKFLOWS_DIR`.
646///
647/// Returns a vec of `WorkflowResponse` entries with `source = "builtin"`.
648fn discover_builtin_workflows() -> Vec<WorkflowResponse> {
649    let home = directories::UserDirs::new()
650        .map(|u| u.home_dir().to_path_buf())
651        .unwrap_or_default();
652    let builtins_dir = home.join(BUILTIN_WORKFLOWS_DIR);
653    let Ok(entries) = std::fs::read_dir(&builtins_dir) else {
654        return Vec::new();
655    };
656
657    let mut workflows = Vec::new();
658    for entry in entries.flatten() {
659        let path = entry.path();
660        let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
661        if ext != "yaml" && ext != "yml" {
662            continue;
663        }
664        let Ok(content) = std::fs::read_to_string(&path) else {
665            continue;
666        };
667        // Extract name, description, tags from YAML frontmatter (lightweight parse)
668        let name = extract_yaml_field(&content, "name").unwrap_or_else(|| {
669            path.file_stem()
670                .unwrap_or_default()
671                .to_string_lossy()
672                .into_owned()
673        });
674        let description = extract_yaml_field(&content, "description").unwrap_or_default();
675        let version = extract_yaml_field(&content, "version").unwrap_or_else(|| "1.0".into());
676        let tags_str = extract_yaml_field(&content, "tags").unwrap_or_default();
677        let tags: Vec<String> = if tags_str.is_empty() {
678            Vec::new()
679        } else {
680            // Parse [tag1, tag2] format
681            tags_str
682                .trim_start_matches('[')
683                .trim_end_matches(']')
684                .split(',')
685                .map(|s| s.trim().trim_matches('"').trim_matches('\'').to_string())
686                .filter(|s| !s.is_empty())
687                .collect()
688        };
689        let steps = count_yaml_steps(&content);
690        let item_name = slugify(&name);
691
692        let triggers = extract_triggers(&content);
693        workflows.push(WorkflowResponse {
694            kref: format!("builtin://{item_name}"),
695            name,
696            item_name,
697            deprecated: false,
698            created_at: None,
699            description,
700            definition: content,
701            version,
702            tags,
703            steps,
704            revision_number: 0,
705            source: "builtin".to_string(),
706            triggers,
707        });
708    }
709    workflows
710}
711
712/// Extract a top-level scalar field from YAML content (lightweight, no full parser).
713fn extract_yaml_field(content: &str, field: &str) -> Option<String> {
714    for line in content.lines() {
715        let trimmed = line.trim();
716        if let Some(rest) = trimmed.strip_prefix(field) {
717            if let Some(value) = rest.strip_prefix(':') {
718                let v = value.trim();
719                // Strip quotes
720                let v = v.trim_matches('"').trim_matches('\'');
721                if !v.is_empty() {
722                    return Some(v.to_string());
723                }
724            }
725        }
726        // Stop at steps/inputs — only look at frontmatter
727        if trimmed == "steps:" || trimmed == "inputs:" {
728            break;
729        }
730    }
731    None
732}
733
734/// Extract trigger definitions from a YAML workflow definition (lightweight, no full parser).
735///
736/// Expects a `triggers:` top-level key containing a list of mappings with `on_kind`,
737/// optional `on_tag` (defaults to `"ready"`), and optional `on_name_pattern`.
738fn extract_triggers(content: &str) -> Vec<WorkflowTrigger> {
739    let mut triggers = Vec::new();
740    let mut in_triggers = false;
741    let mut current_kind = String::new();
742    let mut current_tag = String::new();
743    let mut current_pattern = String::new();
744
745    for line in content.lines() {
746        let trimmed = line.trim();
747        if trimmed == "triggers:" {
748            in_triggers = true;
749            continue;
750        }
751        if !in_triggers {
752            continue;
753        }
754        // A non-indented, non-empty, non-comment line means we left the triggers block
755        if !trimmed.is_empty()
756            && !trimmed.starts_with('-')
757            && !trimmed.starts_with('#')
758            && !line.starts_with(' ')
759            && !line.starts_with('\t')
760        {
761            break;
762        }
763        // New list item — flush previous if any
764        if trimmed.starts_with("- ") {
765            if !current_kind.is_empty() {
766                triggers.push(WorkflowTrigger {
767                    on_kind: std::mem::take(&mut current_kind),
768                    on_tag: if current_tag.is_empty() {
769                        "ready".to_string()
770                    } else {
771                        std::mem::take(&mut current_tag)
772                    },
773                    on_name_pattern: std::mem::take(&mut current_pattern),
774                });
775            }
776            // Parse inline key on the `- ` line (e.g. `- on_kind: model`)
777            let after_dash = trimmed.strip_prefix("- ").unwrap_or("");
778            if let Some((k, v)) = after_dash.split_once(':') {
779                let k = k.trim();
780                let v = v.trim().trim_matches('"').trim_matches('\'');
781                match k {
782                    "on_kind" => current_kind = v.to_string(),
783                    "on_tag" => current_tag = v.to_string(),
784                    "on_name_pattern" => current_pattern = v.to_string(),
785                    _ => {}
786                }
787            }
788            continue;
789        }
790        // Continuation key within a list item
791        if let Some((k, v)) = trimmed.split_once(':') {
792            let k = k.trim();
793            let v = v.trim().trim_matches('"').trim_matches('\'');
794            match k {
795                "on_kind" => current_kind = v.to_string(),
796                "on_tag" => current_tag = v.to_string(),
797                "on_name_pattern" => current_pattern = v.to_string(),
798                _ => {}
799            }
800        }
801    }
802    // Flush last trigger
803    if !current_kind.is_empty() {
804        triggers.push(WorkflowTrigger {
805            on_kind: current_kind,
806            on_tag: if current_tag.is_empty() {
807                "ready".to_string()
808            } else {
809                current_tag
810            },
811            on_name_pattern: current_pattern,
812        });
813    }
814    triggers
815}
816
817/// Extract cron trigger expressions from a workflow YAML definition (lightweight, no full parser).
818///
819/// Expects a `triggers:` top-level key containing list items with a `cron:` field and optional
820/// `timezone:` field.  Returns `Vec<(cron_expression, optional_timezone)>`.
821fn extract_cron_triggers(content: &str) -> Vec<(String, Option<String>)> {
822    let mut results = Vec::new();
823    let mut in_triggers = false;
824    let mut current_cron = String::new();
825    let mut current_tz: Option<String> = None;
826
827    for line in content.lines() {
828        let trimmed = line.trim();
829        if trimmed == "triggers:" {
830            in_triggers = true;
831            continue;
832        }
833        if !in_triggers {
834            continue;
835        }
836        // A non-indented, non-empty, non-comment line means we left the triggers block
837        if !trimmed.is_empty()
838            && !trimmed.starts_with('-')
839            && !trimmed.starts_with('#')
840            && !line.starts_with(' ')
841            && !line.starts_with('\t')
842        {
843            break;
844        }
845        // New list item — flush previous if any
846        if trimmed.starts_with("- ") {
847            if !current_cron.is_empty() {
848                results.push((std::mem::take(&mut current_cron), current_tz.take()));
849            }
850            // Parse inline key on the `- ` line (e.g. `- cron: "0 9 * * *"`)
851            let after_dash = trimmed.strip_prefix("- ").unwrap_or("");
852            if let Some((k, v)) = after_dash.split_once(':') {
853                let k = k.trim();
854                let v = v.trim().trim_matches('"').trim_matches('\'');
855                match k {
856                    "cron" if !v.is_empty() => current_cron = v.to_string(),
857                    "timezone" | "tz" if !v.is_empty() => current_tz = Some(v.to_string()),
858                    _ => {}
859                }
860            }
861            continue;
862        }
863        // Continuation key within a list item
864        if let Some((k, v)) = trimmed.split_once(':') {
865            let k = k.trim();
866            let v = v.trim().trim_matches('"').trim_matches('\'');
867            match k {
868                "cron" if !v.is_empty() => current_cron = v.to_string(),
869                "timezone" | "tz" if !v.is_empty() => current_tz = Some(v.to_string()),
870                _ => {}
871            }
872        }
873    }
874    // Flush last trigger
875    if !current_cron.is_empty() {
876        results.push((current_cron, current_tz));
877    }
878    results
879}
880
881/// Sync cron triggers for a single workflow to the cron scheduler.
882///
883/// Removes any existing cron jobs for this workflow and re-creates them from
884/// the triggers found in the current YAML definition.
885/// Write the workflow YAML to ~/.construct/workflows/ and register a Kumiho artifact.
886async fn persist_workflow_artifact(
887    client: &KumihoClient,
888    revision_kref: &str,
889    revision_number: i32,
890    workflow_name: &str,
891    definition: &str,
892) {
893    let home = directories::UserDirs::new()
894        .map(|u| u.home_dir().to_path_buf())
895        .unwrap_or_default();
896    let dir = home.join(".construct/workflows");
897    let _ = tokio::fs::create_dir_all(&dir).await;
898
899    let slug = slugify(workflow_name);
900    let file_path = dir.join(format!("{slug}.r{revision_number}.yaml"));
901    let location = format!("file://{}", file_path.display());
902
903    if let Err(e) = tokio::fs::write(&file_path, definition).await {
904        tracing::warn!("Failed to write workflow YAML for {workflow_name}: {e}");
905        return;
906    }
907
908    if let Err(e) = client
909        .create_artifact(revision_kref, "workflow.yaml", &location, HashMap::new())
910        .await
911    {
912        tracing::warn!("Failed to create artifact for workflow {workflow_name}: {e}");
913    } else {
914        tracing::info!("Persisted workflow artifact: {location}");
915    }
916}
917
918fn sync_cron_for_workflow(state: &AppState, workflow_name: &str, definition: &str) {
919    let cron_triggers = extract_cron_triggers(definition);
920    let config = state.config.lock();
921
922    // Remove existing cron jobs for this workflow first
923    if let Err(e) = crate::cron::remove_workflow_cron_jobs(&config, workflow_name) {
924        tracing::warn!("Failed to remove old cron jobs for workflow {workflow_name}: {e}");
925    }
926
927    if cron_triggers.is_empty() {
928        return;
929    }
930
931    let wf_crons: Vec<(String, String, Option<String>)> = cron_triggers
932        .into_iter()
933        .map(|(expr, tz)| (workflow_name.to_string(), expr, tz))
934        .collect();
935
936    if let Err(e) = crate::cron::sync_workflow_cron_jobs(&config, &wf_crons) {
937        tracing::warn!("Failed to sync cron triggers for workflow {workflow_name}: {e}");
938    }
939}
940
941/// Merge builtin workflows with Kumiho workflows.
942///
943/// - Builtins whose `item_name` matches a Kumiho item are marked `"builtin-modified"`.
944/// - Unmatched builtins are included as `"builtin"`.
945/// - Kumiho-only workflows remain `"custom"`.
946fn merge_with_builtins(mut kumiho_workflows: Vec<WorkflowResponse>) -> Vec<WorkflowResponse> {
947    let builtins = discover_builtin_workflows();
948    if builtins.is_empty() {
949        return kumiho_workflows;
950    }
951
952    let builtin_names: std::collections::HashSet<String> =
953        builtins.iter().map(|b| b.item_name.clone()).collect();
954
955    // Tag Kumiho workflows that override a builtin
956    for wf in &mut kumiho_workflows {
957        if builtin_names.contains(&wf.item_name) {
958            wf.source = "builtin-modified".to_string();
959        }
960    }
961
962    // Add builtins that have no Kumiho override
963    let kumiho_names: std::collections::HashSet<String> = kumiho_workflows
964        .iter()
965        .map(|w| w.item_name.clone())
966        .collect();
967    for builtin in builtins {
968        if !kumiho_names.contains(&builtin.item_name) {
969            kumiho_workflows.push(builtin);
970        }
971    }
972
973    kumiho_workflows
974}
975
976// ── Definition Handlers ─────────────────────────────────────────────────
977
978/// GET /api/workflows
979pub async fn handle_list_workflows(
980    State(state): State<AppState>,
981    headers: HeaderMap,
982    Query(query): Query<WorkflowListQuery>,
983) -> impl IntoResponse {
984    if let Err(e) = require_auth(&state, &headers) {
985        return e.into_response();
986    }
987
988    let client = build_kumiho_client(&state);
989    let project = workflow_project(&state);
990    let space_path = workflow_space_path(&state);
991
992    // Return cached result if available (before making API call)
993    if query.q.is_none() {
994        if let Some(cached) = get_cached(query.include_deprecated) {
995            return Json(serde_json::json!({ "workflows": cached })).into_response();
996        }
997    }
998
999    let items_result = if let Some(ref q) = query.q {
1000        client
1001            .search_items(q, &project, "workflow", query.include_deprecated)
1002            .await
1003            .map(|results| results.into_iter().map(|sr| sr.item).collect::<Vec<_>>())
1004    } else {
1005        client
1006            .list_items(&space_path, query.include_deprecated)
1007            .await
1008    };
1009
1010    match items_result {
1011        Ok(items) => {
1012            let workflows = merge_with_builtins(enrich_items(&client, items).await);
1013            if query.q.is_none() {
1014                set_cached(&workflows, query.include_deprecated);
1015            }
1016            Json(serde_json::json!({ "workflows": workflows })).into_response()
1017        }
1018        Err(ref e) if matches!(e, KumihoError::Api { status: 404, .. }) => {
1019            let _ = client.ensure_project(&project).await;
1020            let _ = client.ensure_space(&project, WORKFLOW_SPACE_NAME).await;
1021            let workflows = merge_with_builtins(Vec::new());
1022            Json(serde_json::json!({ "workflows": workflows })).into_response()
1023        }
1024        Err(e) => {
1025            // On API error, try to return stale cache rather than an error
1026            if query.q.is_none() {
1027                let lock = WORKFLOW_CACHE.get_or_init(|| Mutex::new(None));
1028                let cache = lock.lock();
1029                if let Some(ref c) = *cache {
1030                    tracing::warn!("Workflows list failed, returning stale cache: {e}");
1031                    return Json(serde_json::json!({ "workflows": c.workflows })).into_response();
1032                }
1033            }
1034            kumiho_err(e).into_response()
1035        }
1036    }
1037}
1038
1039/// POST /api/workflows
1040pub async fn handle_create_workflow(
1041    State(state): State<AppState>,
1042    headers: HeaderMap,
1043    Json(body): Json<CreateWorkflowBody>,
1044) -> impl IntoResponse {
1045    if let Err(e) = require_auth(&state, &headers) {
1046        return e.into_response();
1047    }
1048
1049    let client = build_kumiho_client(&state);
1050    let project = workflow_project(&state);
1051    let space_path = workflow_space_path(&state);
1052
1053    if let Err(e) = client.ensure_project(&project).await {
1054        return kumiho_err(e).into_response();
1055    }
1056    if let Err(e) = client.ensure_space(&project, WORKFLOW_SPACE_NAME).await {
1057        return kumiho_err(e).into_response();
1058    }
1059
1060    let slug = slugify(&body.name);
1061    let item = match client
1062        .create_item(&space_path, &slug, "workflow", HashMap::new())
1063        .await
1064    {
1065        Ok(item) => item,
1066        Err(e) => return kumiho_err(e).into_response(),
1067    };
1068
1069    let metadata = workflow_metadata(&body);
1070    let rev = match client.create_revision(&item.kref, metadata).await {
1071        Ok(rev) => rev,
1072        Err(e) => return kumiho_err(e).into_response(),
1073    };
1074
1075    // Persist YAML to disk and register artifact BEFORE publishing (published revisions are immutable)
1076    persist_workflow_artifact(&client, &rev.kref, rev.number, &body.name, &body.definition).await;
1077    let _ = client.tag_revision(&rev.kref, "published").await;
1078
1079    invalidate_cache();
1080    sync_cron_for_workflow(&state, &body.name, &body.definition);
1081
1082    let workflow = to_workflow_response(&item, Some(&rev));
1083    (
1084        StatusCode::CREATED,
1085        Json(serde_json::json!({ "workflow": workflow })),
1086    )
1087        .into_response()
1088}
1089
1090/// PUT /api/workflows/{*kref}
1091pub async fn handle_update_workflow(
1092    State(state): State<AppState>,
1093    headers: HeaderMap,
1094    Path(kref): Path<String>,
1095    Json(body): Json<CreateWorkflowBody>,
1096) -> impl IntoResponse {
1097    if let Err(e) = require_auth(&state, &headers) {
1098        return e.into_response();
1099    }
1100
1101    let kref = format!("kref://{kref}");
1102    let client = build_kumiho_client(&state);
1103
1104    let metadata = workflow_metadata(&body);
1105    let rev = match client.create_revision(&kref, metadata).await {
1106        Ok(rev) => rev,
1107        Err(e) => return kumiho_err(e).into_response(),
1108    };
1109
1110    // Persist YAML to disk and register artifact BEFORE publishing (published revisions are immutable)
1111    persist_workflow_artifact(&client, &rev.kref, rev.number, &body.name, &body.definition).await;
1112    let _ = client.tag_revision(&rev.kref, "published").await;
1113
1114    let items = match client.list_items(&workflow_space_path(&state), true).await {
1115        Ok(items) => items,
1116        Err(e) => return kumiho_err(e).into_response(),
1117    };
1118
1119    invalidate_cache();
1120    sync_cron_for_workflow(&state, &body.name, &body.definition);
1121
1122    let item = items.iter().find(|i| i.kref == kref);
1123    match item {
1124        Some(item) => {
1125            let workflow = to_workflow_response(item, Some(&rev));
1126            Json(serde_json::json!({ "workflow": workflow })).into_response()
1127        }
1128        None => {
1129            let fallback = ItemResponse {
1130                kref: kref.clone(),
1131                name: body.name.clone(),
1132                item_name: body.name.clone(),
1133                kind: "workflow".to_string(),
1134                deprecated: false,
1135                created_at: None,
1136                metadata: HashMap::new(),
1137            };
1138            let workflow = to_workflow_response(&fallback, Some(&rev));
1139            Json(serde_json::json!({ "workflow": workflow })).into_response()
1140        }
1141    }
1142}
1143
1144/// POST /api/workflows/deprecate
1145pub async fn handle_deprecate_workflow(
1146    State(state): State<AppState>,
1147    headers: HeaderMap,
1148    Json(body): Json<DeprecateBody>,
1149) -> impl IntoResponse {
1150    if let Err(e) = require_auth(&state, &headers) {
1151        return e.into_response();
1152    }
1153
1154    let kref = body.kref.clone();
1155    let client = build_kumiho_client(&state);
1156
1157    match client.deprecate_item(&kref, body.deprecated).await {
1158        Ok(item) => {
1159            invalidate_cache();
1160            let rev = client.get_published_or_latest(&kref).await.ok();
1161
1162            // Sync cron triggers: remove when deprecating, re-add when restoring.
1163            if body.deprecated {
1164                // Remove cron jobs for this workflow
1165                if let Some(item_segment) = kref.split('/').last() {
1166                    let workflow_name = item_segment
1167                        .rsplit_once('.')
1168                        .map(|(name, _kind)| name)
1169                        .unwrap_or(item_segment);
1170                    let config = state.config.lock();
1171                    if let Err(e) = crate::cron::remove_workflow_cron_jobs(&config, workflow_name) {
1172                        tracing::warn!("Failed to remove cron jobs for deprecated workflow: {e}");
1173                    }
1174                }
1175            } else if let Some(ref rev) = rev {
1176                // Restoring — re-sync cron triggers from the definition
1177                if let Some(definition) = rev.metadata.get("definition") {
1178                    sync_cron_for_workflow(&state, &item.item_name, definition);
1179                }
1180            }
1181
1182            let workflow = to_workflow_response(&item, rev.as_ref());
1183            Json(serde_json::json!({ "workflow": workflow })).into_response()
1184        }
1185        Err(e) => kumiho_err(e).into_response(),
1186    }
1187}
1188
1189/// DELETE /api/workflows/{*kref}
1190pub async fn handle_delete_workflow(
1191    State(state): State<AppState>,
1192    headers: HeaderMap,
1193    Path(kref): Path<String>,
1194) -> impl IntoResponse {
1195    if let Err(e) = require_auth(&state, &headers) {
1196        return e.into_response();
1197    }
1198
1199    let kref = format!("kref://{kref}");
1200    let client = build_kumiho_client(&state);
1201
1202    match client.delete_item(&kref).await {
1203        Ok(()) => {
1204            invalidate_cache();
1205
1206            // Remove associated cron jobs.  Extract the item_name from the kref
1207            // (the last path segment minus the `.workflow` kind suffix) and use
1208            // it as the workflow name for cron cleanup.
1209            if let Some(item_segment) = kref.split('/').last() {
1210                let workflow_name = item_segment
1211                    .rsplit_once('.')
1212                    .map(|(name, _kind)| name)
1213                    .unwrap_or(item_segment);
1214                let config = state.config.lock();
1215                if let Err(e) = crate::cron::remove_workflow_cron_jobs(&config, workflow_name) {
1216                    tracing::warn!("Failed to remove cron jobs for deleted workflow: {e}");
1217                }
1218            }
1219
1220            StatusCode::NO_CONTENT.into_response()
1221        }
1222        Err(e) => kumiho_err(e).into_response(),
1223    }
1224}
1225
1226/// POST /api/workflows/run/{name}
1227///
1228/// Triggers a workflow run request.  Creates a `workflow-run-request` item in
1229/// Kumiho so the scheduler or operator can pick it up.
1230pub async fn handle_run_workflow(
1231    State(state): State<AppState>,
1232    headers: HeaderMap,
1233    Path(name): Path<String>,
1234    body: Option<Json<RunWorkflowBody>>,
1235) -> impl IntoResponse {
1236    if let Err(e) = require_auth(&state, &headers) {
1237        return e.into_response();
1238    }
1239
1240    let inputs = body
1241        .as_ref()
1242        .map(|b| b.inputs.clone())
1243        .unwrap_or(serde_json::Value::Object(Default::default()));
1244    let cwd = body.as_ref().and_then(|b| b.cwd.clone());
1245
1246    let run_id = uuid::Uuid::new_v4().to_string();
1247    let now = chrono::Utc::now().to_rfc3339();
1248
1249    let client = build_kumiho_client(&state);
1250    let project = workflow_project(&state);
1251    let requests_space_path = workflow_run_requests_space_path(&state);
1252
1253    // Ensure the WorkflowRunRequests space exists
1254    let _ = client.ensure_project(&project).await;
1255    let _ = client
1256        .ensure_space(&project, WORKFLOW_RUN_REQUESTS_SPACE_NAME)
1257        .await;
1258
1259    let mut metadata = HashMap::new();
1260    metadata.insert("workflow_name".to_string(), name.clone());
1261    metadata.insert("run_id".to_string(), run_id.clone());
1262    metadata.insert("inputs".to_string(), inputs.to_string());
1263    metadata.insert("cwd".to_string(), cwd.unwrap_or_default());
1264    metadata.insert("trigger_source".to_string(), "api".to_string());
1265    metadata.insert("requested_at".to_string(), now);
1266
1267    let item_name = format!("run-{}", &run_id[..run_id.len().min(12)]);
1268
1269    match client
1270        .create_item(
1271            &requests_space_path,
1272            &item_name,
1273            "workflow-run-request",
1274            metadata.clone(),
1275        )
1276        .await
1277    {
1278        Ok(item) => {
1279            if let Ok(rev) = client.create_revision(&item.kref, metadata).await {
1280                let _ = client.tag_revision(&rev.kref, "pending").await;
1281            }
1282            (
1283                StatusCode::OK,
1284                Json(serde_json::json!({
1285                    "run_id": run_id,
1286                    "workflow": name,
1287                    "status": "pending",
1288                })),
1289            )
1290                .into_response()
1291        }
1292        Err(e) => {
1293            tracing::warn!("Failed to create workflow run request: {e}");
1294            (
1295                StatusCode::INTERNAL_SERVER_ERROR,
1296                Json(serde_json::json!({
1297                    "error": format!("Failed to create run request: {e}")
1298                })),
1299            )
1300                .into_response()
1301        }
1302    }
1303}
1304
1305/// GET /api/workflows/revisions/{*kref}
1306///
1307/// Fetches a workflow definition pinned to a specific Kumiho revision kref
1308/// (e.g. `kref://Construct/Workflows/my-wf.workflow?r=3`). Used by the dashboard
1309/// DAG viewer to render the exact YAML a run executed, independent of whatever
1310/// is currently tagged `published` on the workflow item.
1311pub async fn handle_get_workflow_by_revision(
1312    State(state): State<AppState>,
1313    headers: HeaderMap,
1314    Path(kref): Path<String>,
1315) -> impl IntoResponse {
1316    if let Err(e) = require_auth(&state, &headers) {
1317        return e.into_response();
1318    }
1319
1320    let revision_kref = if kref.starts_with("kref://") {
1321        kref.clone()
1322    } else {
1323        format!("kref://{kref}")
1324    };
1325
1326    let client = build_kumiho_client(&state);
1327
1328    let mut rev = match client.get_revision(&revision_kref).await {
1329        Ok(r) => r,
1330        Err(e) => return kumiho_err(e).into_response(),
1331    };
1332
1333    // Canonical source for a pinned revision's YAML is the `workflow.yaml`
1334    // artifact on disk. The inline `definition` metadata key is a legacy
1335    // gateway-authored field and drifts from the artifact for operator-
1336    // authored revisions, so we always prefer the artifact and only fall
1337    // back to metadata when no artifact exists.
1338    if let Ok(artifact) = client
1339        .get_artifact_by_name(&rev.kref, "workflow.yaml")
1340        .await
1341    {
1342        let path = artifact
1343            .location
1344            .strip_prefix("file://")
1345            .unwrap_or(&artifact.location);
1346        if let Ok(yaml) = tokio::fs::read_to_string(path).await {
1347            rev.metadata.insert("definition".to_string(), yaml);
1348        }
1349    }
1350
1351    // Derive a minimal item from the revision's item_kref. The DAG viewer only
1352    // consumes `definition` (YAML) and `revision_number` from the response.
1353    let item_name = rev
1354        .item_kref
1355        .rsplit('/')
1356        .next()
1357        .map(|seg| {
1358            seg.rsplit_once('.')
1359                .map(|(n, _)| n)
1360                .unwrap_or(seg)
1361                .to_string()
1362        })
1363        .unwrap_or_default();
1364
1365    let item = ItemResponse {
1366        kref: rev.item_kref.clone(),
1367        name: item_name.clone(),
1368        item_name,
1369        kind: "workflow".to_string(),
1370        deprecated: false,
1371        created_at: rev.created_at.clone(),
1372        metadata: HashMap::new(),
1373    };
1374
1375    let workflow = to_workflow_response(&item, Some(&rev));
1376    Json(serde_json::json!({ "workflow": workflow })).into_response()
1377}
1378
1379// ── Run Handlers ────────────────────────────────────────────────────────
1380
1381/// GET /api/workflows/runs
1382pub async fn handle_list_workflow_runs(
1383    State(state): State<AppState>,
1384    headers: HeaderMap,
1385    Query(query): Query<WorkflowRunsQuery>,
1386) -> impl IntoResponse {
1387    if let Err(e) = require_auth(&state, &headers) {
1388        return e.into_response();
1389    }
1390
1391    let client = build_kumiho_client(&state);
1392    let project = workflow_project(&state);
1393    let runs_space = workflow_runs_space_path(&state);
1394
1395    match client.list_items(&runs_space, false).await {
1396        Ok(mut items) => {
1397            // Only include workflow_run kind items
1398            items.retain(|i| i.kind == "workflow_run");
1399
1400            if let Some(ref wf_name) = query.workflow {
1401                items.retain(|item| {
1402                    item.metadata
1403                        .get("workflow_name")
1404                        .or_else(|| item.metadata.get("workflow"))
1405                        .map(|n| n == wf_name)
1406                        .unwrap_or(false)
1407                });
1408            }
1409
1410            items.sort_by(|a, b| {
1411                let a_time = a.created_at.as_deref().unwrap_or("");
1412                let b_time = b.created_at.as_deref().unwrap_or("");
1413                b_time.cmp(a_time)
1414            });
1415            items.truncate(query.limit);
1416
1417            let krefs: Vec<String> = items.iter().map(|i| i.kref.clone()).collect();
1418            let rev_map = client
1419                .batch_get_revisions(&krefs, "latest")
1420                .await
1421                .unwrap_or_default();
1422
1423            let runs: Vec<WorkflowRunSummary> = items
1424                .iter()
1425                .map(|item| to_run_summary(item, rev_map.get(&item.kref)))
1426                .collect();
1427
1428            Json(serde_json::json!({ "runs": runs, "count": runs.len() })).into_response()
1429        }
1430        Err(ref e) if matches!(e, KumihoError::Api { status: 404, .. }) => {
1431            let _ = client.ensure_project(&project).await;
1432            let _ = client.ensure_space(&project, WORKFLOW_RUNS_SPACE_NAME).await;
1433            Json(serde_json::json!({ "runs": [], "count": 0 })).into_response()
1434        }
1435        Err(e) => {
1436            let msg = format!("Failed to fetch workflow runs: {e}");
1437            (
1438                StatusCode::SERVICE_UNAVAILABLE,
1439                Json(serde_json::json!({ "error": msg })),
1440            )
1441                .into_response()
1442        }
1443    }
1444}
1445
1446/// GET /api/workflows/runs/{run_id}
1447pub async fn handle_get_workflow_run(
1448    State(state): State<AppState>,
1449    headers: HeaderMap,
1450    Path(run_id): Path<String>,
1451) -> impl IntoResponse {
1452    if let Err(e) = require_auth(&state, &headers) {
1453        return e.into_response();
1454    }
1455
1456    let client = build_kumiho_client(&state);
1457    let project = workflow_project(&state);
1458    let runs_space = workflow_runs_space_path(&state);
1459
1460    // The item name format is "{workflow_name}-{run_id[:12]}", so the first
1461    // 12 characters of the run_id are always present in the item name.
1462    let run_id_prefix = &run_id[..run_id.len().min(12)];
1463
1464    // Strategy 1 (most reliable): filter items by name containing the run_id
1465    // prefix.  This avoids fulltext-search indexing lag and does not depend
1466    // on item metadata being returned in the list response.
1467    if let Ok(items) = client
1468        .list_items_filtered(&runs_space, run_id_prefix, false)
1469        .await
1470    {
1471        // Narrow to workflow_run kind items whose name actually contains the prefix
1472        let run_id_lower = run_id.to_lowercase();
1473        let prefix_lower = run_id_lower[..run_id_lower.len().min(12)].to_string();
1474        if let Some(item) = items.iter().find(|i| {
1475            i.kind == "workflow_run" && i.item_name.to_lowercase().contains(&prefix_lower)
1476        }) {
1477            let rev = client.get_latest_revision(&item.kref).await.ok();
1478            let detail = to_run_detail(item, rev.as_ref());
1479            return Json(serde_json::json!({ "run": detail })).into_response();
1480        }
1481    }
1482
1483    // Strategy 2: full-text search by run_id (may find it if indexed in item
1484    // metadata or if the run_id appears in the item name).
1485    if let Ok(results) = client
1486        .search_items(&run_id, &project, "workflow_run", false)
1487        .await
1488    {
1489        if let Some(sr) = results.first() {
1490            let rev = client.get_latest_revision(&sr.item.kref).await.ok();
1491            let detail = to_run_detail(&sr.item, rev.as_ref());
1492            return Json(serde_json::json!({ "run": detail })).into_response();
1493        }
1494    }
1495
1496    // Strategy 3: broad list + metadata/name match as last resort
1497    match client.list_items(&runs_space, false).await {
1498        Ok(items) => {
1499            let run_id_lower = run_id.to_lowercase();
1500            let found = items.iter().find(|item| {
1501                if item.kind != "workflow_run" {
1502                    return false;
1503                }
1504                // Match by metadata run_id (if metadata is returned)
1505                if let Some(meta_run_id) = item.metadata.get("run_id") {
1506                    if meta_run_id == &run_id {
1507                        return true;
1508                    }
1509                }
1510                // Match by item_name containing the run_id prefix (first 12 chars)
1511                let prefix = &run_id_lower[..run_id_lower.len().min(12)];
1512                item.item_name.to_lowercase().contains(prefix)
1513            });
1514
1515            match found {
1516                Some(item) => {
1517                    let rev = client.get_latest_revision(&item.kref).await.ok();
1518                    let detail = to_run_detail(item, rev.as_ref());
1519                    Json(serde_json::json!({ "run": detail })).into_response()
1520                }
1521                None => (
1522                    StatusCode::NOT_FOUND,
1523                    Json(serde_json::json!({ "error": format!("Run '{run_id}' not found") })),
1524                )
1525                    .into_response(),
1526            }
1527        }
1528        Err(e) => {
1529            let msg = format!("Kumiho error looking up run '{run_id}': {e}");
1530            tracing::warn!("{msg}");
1531            (
1532                StatusCode::SERVICE_UNAVAILABLE,
1533                Json(serde_json::json!({ "error": msg })),
1534            )
1535                .into_response()
1536        }
1537    }
1538}
1539
1540/// DELETE /api/workflows/runs/{run_id}
1541///
1542/// Deletes a workflow run from the WorkflowRuns space.  Finds the item by
1543/// run_id prefix matching (same strategy as the GET handler) then calls
1544/// `delete_item` on the resolved kref.
1545pub async fn handle_delete_workflow_run(
1546    State(state): State<AppState>,
1547    headers: HeaderMap,
1548    Path(run_id): Path<String>,
1549) -> impl IntoResponse {
1550    if let Err(e) = require_auth(&state, &headers) {
1551        return e.into_response();
1552    }
1553
1554    let client = build_kumiho_client(&state);
1555    let runs_space = workflow_runs_space_path(&state);
1556
1557    // Resolve the run item — reuse the same prefix-matching logic as the GET
1558    let run_id_prefix = &run_id[..run_id.len().min(12)];
1559
1560    let kref = if let Ok(items) = client
1561        .list_items_filtered(&runs_space, run_id_prefix, false)
1562        .await
1563    {
1564        let run_id_lower = run_id.to_lowercase();
1565        let prefix_lower = run_id_lower[..run_id_lower.len().min(12)].to_string();
1566        items
1567            .iter()
1568            .find(|i| {
1569                i.kind == "workflow_run" && i.item_name.to_lowercase().contains(&prefix_lower)
1570            })
1571            .map(|i| i.kref.clone())
1572    } else {
1573        None
1574    };
1575
1576    let kref = match kref {
1577        Some(k) => k,
1578        None => {
1579            return (
1580                StatusCode::NOT_FOUND,
1581                Json(serde_json::json!({ "error": format!("Run '{run_id}' not found") })),
1582            )
1583                .into_response();
1584        }
1585    };
1586
1587    match client.delete_item(&kref).await {
1588        Ok(()) => StatusCode::NO_CONTENT.into_response(),
1589        Err(e) => {
1590            let msg = format!("Failed to delete run '{run_id}': {e}");
1591            tracing::warn!("{msg}");
1592            kumiho_err(e).into_response()
1593        }
1594    }
1595}
1596
1597/// POST /api/workflows/runs/{run_id}/approve
1598///
1599/// Body: { "approved": bool, "feedback": string (optional) }
1600///
1601/// Approves or rejects a paused workflow step. Atomically claims the approval
1602/// from the registry to prevent race conditions with Discord.
1603pub async fn handle_approve_workflow_run(
1604    State(state): State<AppState>,
1605    headers: HeaderMap,
1606    Path(run_id): Path<String>,
1607    Json(body): Json<ApproveWorkflowBody>,
1608) -> impl IntoResponse {
1609    if let Err(e) = require_auth(&state, &headers) {
1610        return e.into_response();
1611    }
1612
1613    let approved = body.approved;
1614    let feedback = body.feedback.unwrap_or_default();
1615
1616    // Atomically claim the approval from the registry to prevent races with Discord.
1617    // If None returned, the registry may have been lost (gateway restart) — fall through
1618    // and call resume_workflow directly; the operator validates paused state itself.
1619    let claimed = state.approval_registry.try_claim(&run_id);
1620    let cwd = claimed
1621        .as_ref()
1622        .map(|a| a.cwd.clone())
1623        .unwrap_or_else(|| "/tmp".to_string());
1624
1625    if claimed.is_none() {
1626        tracing::info!(
1627            "approve_workflow_run: no registry entry for run_id={run_id} (gateway restart?), \
1628             calling resume_workflow directly"
1629        );
1630    }
1631
1632    // Call the operator MCP tool `resume_workflow`
1633    let tool_name = format!(
1634        "{}__resume_workflow",
1635        crate::agent::operator::OPERATOR_SERVER_NAME
1636    );
1637    let mut tool_args = serde_json::Map::new();
1638    tool_args.insert(
1639        "run_id".to_string(),
1640        serde_json::Value::String(run_id.clone()),
1641    );
1642    tool_args.insert("approved".to_string(), serde_json::Value::Bool(approved));
1643    tool_args.insert(
1644        "response".to_string(),
1645        serde_json::Value::String(feedback.clone()),
1646    );
1647    tool_args.insert("cwd".to_string(), serde_json::Value::String(cwd));
1648
1649    let mcp_result = if let Some(ref registry) = state.mcp_registry {
1650        let mcp_future = registry.call_tool(&tool_name, serde_json::Value::Object(tool_args));
1651        match tokio::time::timeout(std::time::Duration::from_secs(30), mcp_future).await {
1652            Ok(Ok(result_str)) => Ok(result_str),
1653            Ok(Err(e)) => Err(format!("operator tool call failed: {e:#}")),
1654            Err(_) => Err("operator tool call timed out (30s)".to_string()),
1655        }
1656    } else {
1657        Err("MCP registry not available — operator not connected".to_string())
1658    };
1659
1660    match mcp_result {
1661        Ok(_) => {
1662            // Broadcast a human_approval_resolved SSE event so connected dashboards
1663            // can update their UI immediately without waiting for the next REST poll.
1664            let _ = state.event_tx.send(serde_json::json!({
1665                "type": "human_approval_resolved",
1666                "run_id": run_id,
1667                "approved": approved,
1668                "timestamp": chrono::Utc::now().to_rfc3339(),
1669            }));
1670
1671            (
1672                StatusCode::OK,
1673                Json(serde_json::json!({
1674                    "status": "ok",
1675                    "message": if approved { "Workflow approved" } else { "Workflow rejected" },
1676                    "run_id": run_id,
1677                    "approved": approved,
1678                })),
1679            )
1680                .into_response()
1681        }
1682        Err(e) => {
1683            tracing::warn!("approve_workflow_run: failed for run_id={run_id}: {e}");
1684            (
1685                StatusCode::BAD_GATEWAY,
1686                Json(serde_json::json!({
1687                    "error": format!("Failed to resume workflow: {e}")
1688                })),
1689            )
1690                .into_response()
1691        }
1692    }
1693}
1694
1695#[derive(Deserialize)]
1696pub struct ApproveWorkflowBody {
1697    pub approved: bool,
1698    pub feedback: Option<String>,
1699}
1700
1701/// POST /api/workflows/runs/{run_id}/retry
1702///
1703/// Body: { "cwd": string (optional) }
1704///
1705/// Retries a failed workflow run from the first failed step. Successful step
1706/// outputs are preserved so only the failed step + downstream steps re-execute.
1707pub async fn handle_retry_workflow_run(
1708    State(state): State<AppState>,
1709    headers: HeaderMap,
1710    Path(run_id): Path<String>,
1711    body: Option<Json<RetryWorkflowBody>>,
1712) -> impl IntoResponse {
1713    if let Err(e) = require_auth(&state, &headers) {
1714        return e.into_response();
1715    }
1716
1717    let cwd = body
1718        .and_then(|Json(b)| b.cwd)
1719        .unwrap_or_else(|| "/tmp".to_string());
1720
1721    let tool_name = format!(
1722        "{}__retry_workflow",
1723        crate::agent::operator::OPERATOR_SERVER_NAME
1724    );
1725    let mut tool_args = serde_json::Map::new();
1726    tool_args.insert(
1727        "run_id".to_string(),
1728        serde_json::Value::String(run_id.clone()),
1729    );
1730    tool_args.insert("cwd".to_string(), serde_json::Value::String(cwd));
1731
1732    let mcp_result = if let Some(ref registry) = state.mcp_registry {
1733        let mcp_future = registry.call_tool(&tool_name, serde_json::Value::Object(tool_args));
1734        match tokio::time::timeout(std::time::Duration::from_secs(30), mcp_future).await {
1735            Ok(Ok(result_str)) => Ok(result_str),
1736            Ok(Err(e)) => Err(format!("operator tool call failed: {e:#}")),
1737            Err(_) => Err("operator tool call timed out (30s)".to_string()),
1738        }
1739    } else {
1740        Err("MCP registry not available — operator not connected".to_string())
1741    };
1742
1743    match mcp_result {
1744        Ok(result_str) => {
1745            let _ = state.event_tx.send(serde_json::json!({
1746                "type": "workflow_retry",
1747                "run_id": run_id,
1748                "timestamp": chrono::Utc::now().to_rfc3339(),
1749            }));
1750            let payload = serde_json::from_str::<serde_json::Value>(&result_str)
1751                .unwrap_or_else(|_| serde_json::json!({"raw": result_str}));
1752            (StatusCode::OK, Json(payload)).into_response()
1753        }
1754        Err(e) => {
1755            tracing::warn!("retry_workflow_run: failed for run_id={run_id}: {e}");
1756            (
1757                StatusCode::BAD_GATEWAY,
1758                Json(serde_json::json!({ "error": format!("Failed to retry workflow: {e}") })),
1759            )
1760                .into_response()
1761        }
1762    }
1763}
1764
1765#[derive(Deserialize)]
1766pub struct RetryWorkflowBody {
1767    pub cwd: Option<String>,
1768}
1769
1770/// GET /api/workflows/agent-activity/{agent_id}
1771///
1772/// Reads the RunLog JSONL file for an agent and returns structured activity data.
1773/// Used by the Live Execution View for on-demand drill-down into agent tool calls,
1774/// messages, and results.
1775pub async fn handle_agent_activity(
1776    State(state): State<AppState>,
1777    headers: HeaderMap,
1778    Path(agent_id): Path<String>,
1779    Query(query): Query<AgentActivityQuery>,
1780) -> impl IntoResponse {
1781    if let Err(e) = require_auth(&state, &headers) {
1782        return e.into_response();
1783    }
1784
1785    let runlogs_dir =
1786        std::path::PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/tmp".into()))
1787            .join(".construct/operator_mcp/runlogs");
1788    let path = runlogs_dir.join(format!("{agent_id}.jsonl"));
1789
1790    if !path.exists() {
1791        return (
1792            StatusCode::NOT_FOUND,
1793            Json(serde_json::json!({ "error": "No run log found for this agent" })),
1794        )
1795            .into_response();
1796    }
1797
1798    let content = match std::fs::read_to_string(&path) {
1799        Ok(c) => c,
1800        Err(e) => {
1801            return (
1802                StatusCode::INTERNAL_SERVER_ERROR,
1803                Json(serde_json::json!({ "error": format!("Failed to read log: {e}") })),
1804            )
1805                .into_response();
1806        }
1807    };
1808
1809    let view = query.view.as_deref().unwrap_or("summary");
1810    let limit = query.limit.unwrap_or(100).min(500) as usize;
1811
1812    let entries: Vec<serde_json::Value> = content
1813        .lines()
1814        .filter_map(|line| serde_json::from_str(line).ok())
1815        .collect();
1816
1817    match view {
1818        "tool_calls" => {
1819            // Return tool calls with full args and results
1820            let tools: Vec<&serde_json::Value> = entries
1821                .iter()
1822                .filter(|e| e.get("kind").and_then(|v| v.as_str()) == Some("tool_call"))
1823                .collect();
1824            let total = tools.len();
1825            let slice: Vec<_> = tools.into_iter().rev().take(limit).collect();
1826            Json(serde_json::json!({
1827                "agent_id": agent_id,
1828                "view": "tool_calls",
1829                "total": total,
1830                "entries": slice,
1831            }))
1832            .into_response()
1833        }
1834        "messages" => {
1835            // Return assistant messages
1836            let msgs: Vec<&serde_json::Value> = entries
1837                .iter()
1838                .filter(|e| {
1839                    let kind = e.get("kind").and_then(|v| v.as_str()).unwrap_or("");
1840                    kind == "message" || kind == "user_message"
1841                })
1842                .collect();
1843            let total = msgs.len();
1844            let slice: Vec<_> = msgs.into_iter().rev().take(limit).collect();
1845            Json(serde_json::json!({
1846                "agent_id": agent_id,
1847                "view": "messages",
1848                "total": total,
1849                "entries": slice,
1850            }))
1851            .into_response()
1852        }
1853        "errors" => {
1854            let errs: Vec<&serde_json::Value> = entries
1855                .iter()
1856                .filter(|e| {
1857                    let kind = e.get("kind").and_then(|v| v.as_str()).unwrap_or("");
1858                    kind == "error"
1859                        || kind == "turn_failed"
1860                        || e.get("status").and_then(|v| v.as_str()) == Some("failed")
1861                })
1862                .collect();
1863            Json(serde_json::json!({
1864                "agent_id": agent_id,
1865                "view": "errors",
1866                "total": errs.len(),
1867                "entries": errs,
1868            }))
1869            .into_response()
1870        }
1871        "full" => {
1872            // Last N entries (most recent)
1873            let total = entries.len();
1874            let slice: Vec<_> = entries.into_iter().rev().take(limit).collect();
1875            Json(serde_json::json!({
1876                "agent_id": agent_id,
1877                "view": "full",
1878                "total": total,
1879                "entries": slice,
1880            }))
1881            .into_response()
1882        }
1883        _ => {
1884            // Summary view: header + stats + last message + recent tool calls
1885            let header = entries.first().cloned().unwrap_or_default();
1886            let tool_count = entries
1887                .iter()
1888                .filter(|e| e.get("kind").and_then(|v| v.as_str()) == Some("tool_call"))
1889                .count();
1890            let error_count = entries
1891                .iter()
1892                .filter(|e| {
1893                    let kind = e.get("kind").and_then(|v| v.as_str()).unwrap_or("");
1894                    kind == "error" || kind == "turn_failed"
1895                })
1896                .count();
1897            let last_message = entries
1898                .iter()
1899                .rev()
1900                .find(|e| e.get("kind").and_then(|v| v.as_str()) == Some("message"))
1901                .and_then(|e| e.get("text").and_then(|v| v.as_str()))
1902                .unwrap_or("");
1903            // Truncate to reasonable size for summary
1904            let last_msg_truncated = if last_message.len() > 5000 {
1905                &last_message[..5000]
1906            } else {
1907                last_message
1908            };
1909            // Recent tool calls (last 20)
1910            let recent_tools: Vec<_> = entries
1911                .iter()
1912                .filter(|e| e.get("kind").and_then(|v| v.as_str()) == Some("tool_call"))
1913                .rev()
1914                .take(20)
1915                .cloned()
1916                .collect();
1917            // Usage stats from turn_completed entries
1918            let mut input_tokens: u64 = 0;
1919            let mut output_tokens: u64 = 0;
1920            let mut total_cost: f64 = 0.0;
1921            for e in &entries {
1922                if e.get("kind").and_then(|v| v.as_str()) == Some("turn_completed") {
1923                    if let Some(usage) = e.get("usage") {
1924                        input_tokens += usage
1925                            .get("inputTokens")
1926                            .and_then(|v| v.as_u64())
1927                            .unwrap_or(0);
1928                        output_tokens += usage
1929                            .get("outputTokens")
1930                            .and_then(|v| v.as_u64())
1931                            .unwrap_or(0);
1932                        total_cost += usage
1933                            .get("totalCostUsd")
1934                            .and_then(|v| v.as_f64())
1935                            .unwrap_or(0.0);
1936                    }
1937                }
1938            }
1939            Json(serde_json::json!({
1940                "agent_id": agent_id,
1941                "view": "summary",
1942                "title": header.get("title").and_then(|v| v.as_str()).unwrap_or(""),
1943                "agent_type": header.get("agent_type").and_then(|v| v.as_str()).unwrap_or(""),
1944                "total_events": entries.len(),
1945                "tool_call_count": tool_count,
1946                "error_count": error_count,
1947                "last_message": last_msg_truncated,
1948                "recent_tools": recent_tools,
1949                "usage": {
1950                    "input_tokens": input_tokens,
1951                    "output_tokens": output_tokens,
1952                    "total_cost_usd": total_cost,
1953                },
1954            }))
1955            .into_response()
1956        }
1957    }
1958}
1959
1960#[derive(Deserialize)]
1961pub struct AgentActivityQuery {
1962    view: Option<String>,
1963    limit: Option<u32>,
1964}
1965
1966/// GET /api/workflows/dashboard
1967pub async fn handle_workflow_dashboard(
1968    State(state): State<AppState>,
1969    headers: HeaderMap,
1970) -> impl IntoResponse {
1971    if let Err(e) = require_auth(&state, &headers) {
1972        return e.into_response();
1973    }
1974
1975    let client = build_kumiho_client(&state);
1976    let space_path = workflow_space_path(&state);
1977    let runs_space = workflow_runs_space_path(&state);
1978
1979    // Fetch definitions from Kumiho + merge builtins
1980    let definitions = match client.list_items(&space_path, false).await {
1981        Ok(items) => merge_with_builtins(enrich_items(&client, items).await),
1982        Err(_) => merge_with_builtins(Vec::new()),
1983    };
1984    let definitions_count = definitions.len();
1985
1986    // Fetch recent runs from Kumiho
1987    let (recent_runs, total_runs) = match client.list_items(&runs_space, false).await {
1988        Ok(mut items) => {
1989            let total = items.len();
1990            items.sort_by(|a, b| {
1991                let a_time = a.created_at.as_deref().unwrap_or("");
1992                let b_time = b.created_at.as_deref().unwrap_or("");
1993                b_time.cmp(a_time)
1994            });
1995            items.truncate(5);
1996
1997            let krefs: Vec<String> = items.iter().map(|i| i.kref.clone()).collect();
1998            let rev_map = client
1999                .batch_get_revisions(&krefs, "latest")
2000                .await
2001                .unwrap_or_default();
2002
2003            let runs: Vec<WorkflowRunSummary> = items
2004                .iter()
2005                .map(|item| to_run_summary(item, rev_map.get(&item.kref)))
2006                .collect();
2007
2008            (runs, total)
2009        }
2010        Err(_) => (Vec::new(), 0),
2011    };
2012
2013    let active_runs = recent_runs
2014        .iter()
2015        .filter(|r| r.status == "running" || r.status == "paused")
2016        .count();
2017
2018    let dashboard = WorkflowDashboard {
2019        definitions_count,
2020        definitions,
2021        active_runs,
2022        recent_runs,
2023        total_runs,
2024    };
2025
2026    Json(serde_json::json!({ "dashboard": dashboard })).into_response()
2027}