Skip to main content

construct/gateway/
api_workflows.rs

1//! REST API handlers for workflow management (`/api/workflows`).
2//!
3//! Each workflow definition is a Kumiho item of kind `"workflow"` in the
4//! `Construct/Workflows` space.  The YAML definition and metadata (description,
5//! version, tags, steps count) are stored as revision metadata.
6//!
7//! Provides:
8//!   - `GET    /api/workflows`              — list workflow definitions
9//!   - `POST   /api/workflows`              — create a new workflow
10//!   - `PUT    /api/workflows/{*kref}`      — update an existing workflow
11//!   - `DELETE /api/workflows/{*kref}`       — delete a workflow
12//!   - `POST   /api/workflows/deprecate`    — toggle deprecation
13//!   - `GET    /api/workflows/runs`         — recent workflow runs (from Kumiho)
14//!   - `GET    /api/workflows/runs/{id}`    — single run detail
15//!   - `GET    /api/workflows/dashboard`    — aggregated stats
16
17use super::AppState;
18use super::api::require_auth;
19use super::api_agents::build_kumiho_client;
20use super::kumiho_client::{ItemResponse, KumihoClient, KumihoError, RevisionResponse, slugify};
21use axum::{
22    extract::{Path, Query, State},
23    http::{HeaderMap, StatusCode},
24    response::{IntoResponse, Json},
25};
26use parking_lot::Mutex;
27use serde::{Deserialize, Serialize};
28use std::collections::HashMap;
29use std::sync::OnceLock;
30use std::time::Instant;
31
32const WORKFLOW_SPACE_NAME: &str = "Workflows";
33const WORKFLOW_RUNS_SPACE_NAME: &str = "WorkflowRuns";
34const WORKFLOW_RUN_REQUESTS_SPACE_NAME: &str = "WorkflowRunRequests";
35
36fn workflow_project(state: &AppState) -> String {
37    state.config.lock().kumiho.harness_project.clone()
38}
39
40fn workflow_space_path(state: &AppState) -> String {
41    format!("/{}/{}", workflow_project(state), WORKFLOW_SPACE_NAME)
42}
43
44fn workflow_runs_space_path(state: &AppState) -> String {
45    format!("/{}/{}", workflow_project(state), WORKFLOW_RUNS_SPACE_NAME)
46}
47
48fn workflow_run_requests_space_path(state: &AppState) -> String {
49    format!(
50        "/{}/{}",
51        workflow_project(state),
52        WORKFLOW_RUN_REQUESTS_SPACE_NAME
53    )
54}
55
56// ── Response cache ──────────────────────────────────────────────────────
57
58struct WorkflowCache {
59    workflows: Vec<WorkflowResponse>,
60    include_deprecated: bool,
61    fetched_at: Instant,
62}
63
64static WORKFLOW_CACHE: OnceLock<Mutex<Option<WorkflowCache>>> = OnceLock::new();
65const CACHE_TTL_SECS: u64 = 3;
66
67fn get_cached(include_deprecated: bool) -> Option<Vec<WorkflowResponse>> {
68    let lock = WORKFLOW_CACHE.get_or_init(|| Mutex::new(None));
69    let cache = lock.lock();
70    if let Some(ref c) = *cache {
71        if c.include_deprecated == include_deprecated
72            && c.fetched_at.elapsed().as_secs() < CACHE_TTL_SECS
73        {
74            return Some(c.workflows.clone());
75        }
76    }
77    None
78}
79
80fn set_cached(workflows: &[WorkflowResponse], include_deprecated: bool) {
81    let lock = WORKFLOW_CACHE.get_or_init(|| Mutex::new(None));
82    let mut cache = lock.lock();
83    *cache = Some(WorkflowCache {
84        workflows: workflows.to_vec(),
85        include_deprecated,
86        fetched_at: Instant::now(),
87    });
88}
89
90fn invalidate_cache() {
91    if let Some(lock) = WORKFLOW_CACHE.get() {
92        let mut cache = lock.lock();
93        // Mark as expired but keep stale data for fallback on API errors
94        if let Some(ref mut c) = *cache {
95            c.fetched_at = Instant::now() - std::time::Duration::from_secs(CACHE_TTL_SECS + 1);
96        }
97    }
98}
99
100// ── Query / request types ───────────────────────────────────────────────
101
102#[derive(Deserialize)]
103pub struct WorkflowListQuery {
104    #[serde(default)]
105    pub include_deprecated: bool,
106    pub q: Option<String>,
107}
108
109#[derive(Deserialize)]
110pub struct CreateWorkflowBody {
111    pub name: String,
112    pub description: String,
113    pub definition: String,
114    #[serde(default)]
115    pub version: Option<String>,
116    #[serde(default)]
117    pub tags: Option<Vec<String>>,
118}
119
120#[derive(Deserialize)]
121pub struct DeprecateBody {
122    pub kref: String,
123    pub deprecated: bool,
124}
125
126#[derive(Deserialize)]
127pub struct WorkflowRunsQuery {
128    #[serde(default = "default_limit")]
129    pub limit: usize,
130    #[serde(default)]
131    pub workflow: Option<String>,
132}
133
134fn default_limit() -> usize {
135    20
136}
137
138#[derive(Deserialize)]
139pub struct RunWorkflowBody {
140    #[serde(default)]
141    pub inputs: serde_json::Value,
142    #[serde(default)]
143    pub cwd: Option<String>,
144}
145
146// ── Response types ──────────────────────────────────────────────────────
147
148#[derive(Serialize, Clone)]
149pub struct WorkflowResponse {
150    pub kref: String,
151    pub name: String,
152    pub item_name: String,
153    pub deprecated: bool,
154    pub created_at: Option<String>,
155    pub description: String,
156    pub definition: String,
157    pub version: String,
158    pub tags: Vec<String>,
159    pub steps: usize,
160    pub revision_number: i32,
161    /// `"builtin"` — shipped with Construct, not yet customized.
162    /// `"builtin-modified"` — builtin overridden by a Kumiho copy.
163    /// `"custom"` — user-created workflow.
164    #[serde(default = "default_source")]
165    pub source: String,
166    #[serde(skip_serializing_if = "Vec::is_empty")]
167    pub triggers: Vec<WorkflowTrigger>,
168}
169
170fn default_source() -> String {
171    "custom".to_string()
172}
173
174#[derive(Serialize, Clone, Debug)]
175pub struct WorkflowTrigger {
176    pub on_kind: String,
177    pub on_tag: String,
178    #[serde(skip_serializing_if = "String::is_empty")]
179    pub on_name_pattern: String,
180}
181
182#[derive(Serialize, Clone)]
183pub struct WorkflowRunSummary {
184    pub kref: String,
185    pub run_id: String,
186    pub workflow_name: String,
187    pub status: String,
188    pub started_at: String,
189    pub completed_at: String,
190    pub steps_completed: String,
191    pub steps_total: String,
192    pub error: String,
193    /// Kumiho item kref of the workflow definition this run used.
194    /// Empty for built-in / disk-fallback workflows.
195    #[serde(default, skip_serializing_if = "String::is_empty")]
196    pub workflow_item_kref: String,
197    /// Kumiho revision kref of the exact workflow YAML this run executed.
198    /// The dashboard DAG viewer fetches this revision so the rendered graph
199    /// always matches what the run actually ran — independent of later retags.
200    #[serde(default, skip_serializing_if = "String::is_empty")]
201    pub workflow_revision_kref: String,
202}
203
204#[derive(Serialize, Clone)]
205pub struct TranscriptEntry {
206    pub speaker: String,
207    pub content: String,
208    pub round: u32,
209}
210
211#[derive(Serialize, Clone, Default)]
212pub struct ApprovalOutputData {
213    #[serde(skip_serializing_if = "Option::is_none")]
214    pub awaiting_approval: Option<bool>,
215    #[serde(skip_serializing_if = "Option::is_none")]
216    pub approval_message: Option<String>,
217    #[serde(skip_serializing_if = "Vec::is_empty")]
218    pub approve_keywords: Vec<String>,
219    #[serde(skip_serializing_if = "Vec::is_empty")]
220    pub reject_keywords: Vec<String>,
221}
222
223#[derive(Serialize, Clone)]
224pub struct WorkflowStepDetail {
225    pub step_id: String,
226    pub status: String,
227    #[serde(skip_serializing_if = "String::is_empty")]
228    pub agent_id: String,
229    #[serde(skip_serializing_if = "String::is_empty")]
230    pub agent_type: String,
231    #[serde(skip_serializing_if = "String::is_empty")]
232    pub role: String,
233    #[serde(skip_serializing_if = "String::is_empty")]
234    pub template_name: String,
235    #[serde(skip_serializing_if = "String::is_empty")]
236    pub output_preview: String,
237    #[serde(skip_serializing_if = "String::is_empty")]
238    pub artifact_path: String,
239    #[serde(skip_serializing_if = "Vec::is_empty")]
240    pub skills: Vec<String>,
241    #[serde(skip_serializing_if = "Vec::is_empty")]
242    pub transcript: Vec<TranscriptEntry>,
243    #[serde(skip_serializing_if = "Option::is_none")]
244    pub output_data: Option<ApprovalOutputData>,
245}
246
247#[derive(Serialize, Clone)]
248pub struct WorkflowRunDetail {
249    #[serde(flatten)]
250    pub summary: WorkflowRunSummary,
251    pub steps: Vec<WorkflowStepDetail>,
252}
253
254#[derive(Serialize)]
255pub struct WorkflowDashboard {
256    pub definitions_count: usize,
257    pub definitions: Vec<WorkflowResponse>,
258    pub active_runs: usize,
259    pub recent_runs: Vec<WorkflowRunSummary>,
260    pub total_runs: usize,
261}
262
263// ── Helpers ─────────────────────────────────────────────────────────────
264
265fn kumiho_err(e: KumihoError) -> (StatusCode, Json<serde_json::Value>) {
266    match &e {
267        KumihoError::Unreachable(_) => (
268            StatusCode::SERVICE_UNAVAILABLE,
269            Json(serde_json::json!({ "error": format!("Kumiho service unavailable: {e}") })),
270        ),
271        KumihoError::Api { status, body } => {
272            let code = if *status == 401 || *status == 403 {
273                StatusCode::BAD_GATEWAY
274            } else {
275                StatusCode::from_u16(*status).unwrap_or(StatusCode::BAD_GATEWAY)
276            };
277            (
278                code,
279                Json(serde_json::json!({ "error": format!("Kumiho upstream: {body}") })),
280            )
281        }
282        KumihoError::Decode(msg) => (
283            StatusCode::BAD_GATEWAY,
284            Json(serde_json::json!({ "error": format!("Bad response from Kumiho: {msg}") })),
285        ),
286    }
287}
288
289fn workflow_metadata(body: &CreateWorkflowBody) -> HashMap<String, String> {
290    let mut meta = HashMap::new();
291    meta.insert("display_name".to_string(), body.name.clone());
292    meta.insert("description".to_string(), body.description.clone());
293    meta.insert("definition".to_string(), body.definition.clone());
294    meta.insert("created_by".to_string(), "construct-dashboard".to_string());
295    // Count steps in the YAML
296    let steps = count_yaml_steps(&body.definition);
297    meta.insert("steps".to_string(), steps.to_string());
298    if let Some(ref tags) = body.tags {
299        if !tags.is_empty() {
300            meta.insert("tags".to_string(), tags.join(","));
301        }
302    }
303    // Full-text search index
304    meta.insert(
305        "_search_text".to_string(),
306        format!("{} {}", body.name, body.description),
307    );
308    meta
309}
310
311fn count_yaml_steps(content: &str) -> usize {
312    let mut count = 0;
313    let mut in_steps = false;
314    for line in content.lines() {
315        let trimmed = line.trim();
316        if trimmed == "steps:" || trimmed == "tasks:" {
317            in_steps = true;
318            continue;
319        }
320        if in_steps {
321            if trimmed.starts_with("- id:") {
322                count += 1;
323            }
324            if !trimmed.is_empty()
325                && !trimmed.starts_with('-')
326                && !trimmed.starts_with(' ')
327                && !trimmed.starts_with('#')
328                && !line.starts_with(' ')
329            {
330                break;
331            }
332        }
333    }
334    count
335}
336
337fn to_workflow_response(item: &ItemResponse, rev: Option<&RevisionResponse>) -> WorkflowResponse {
338    let meta = rev.map(|r| &r.metadata);
339    let get = |key: &str| -> String { meta.and_then(|m| m.get(key)).cloned().unwrap_or_default() };
340    let tags_str = get("tags");
341    let tags: Vec<String> = if tags_str.is_empty() {
342        Vec::new()
343    } else {
344        tags_str.split(',').map(|s| s.trim().to_string()).collect()
345    };
346    let steps: usize = get("steps").parse().unwrap_or(0);
347
348    let display_name = {
349        let n = get("display_name");
350        if n.is_empty() {
351            item.item_name.clone()
352        } else {
353            n
354        }
355    };
356
357    let definition = get("definition");
358    let triggers = extract_triggers(&definition);
359
360    WorkflowResponse {
361        kref: item.kref.clone(),
362        name: display_name,
363        item_name: item.item_name.clone(),
364        deprecated: item.deprecated,
365        created_at: item.created_at.clone(),
366        description: get("description"),
367        definition,
368        version: format!("{}", rev.map(|r| r.number).unwrap_or(0)),
369        tags,
370        steps,
371        revision_number: rev.map(|r| r.number).unwrap_or(0),
372        source: "custom".to_string(),
373        triggers,
374    }
375}
376
377/// Prefer the `workflow.yaml` artifact on disk as the canonical definition,
378/// falling back to inline `definition` metadata only when no artifact exists.
379///
380/// The inline `definition` metadata is a legacy gateway-authored field that
381/// drifts from the artifact for operator-authored revisions and can also be
382/// truncated by Kumiho's batch endpoint for large YAMLs. The artifact file is
383/// the source of truth, so we always overwrite metadata with it when present.
384async fn prefer_artifact_definitions(
385    client: &super::kumiho_client::KumihoClient,
386    revs: &mut HashMap<String, RevisionResponse>,
387) {
388    for rev in revs.values_mut() {
389        if let Ok(artifact) = client
390            .get_artifact_by_name(&rev.kref, "workflow.yaml")
391            .await
392        {
393            let path = artifact
394                .location
395                .strip_prefix("file://")
396                .unwrap_or(&artifact.location);
397            if let Ok(yaml) = tokio::fs::read_to_string(path).await {
398                rev.metadata.insert("definition".to_string(), yaml);
399            }
400        }
401    }
402}
403
404async fn enrich_items(
405    client: &super::kumiho_client::KumihoClient,
406    items: Vec<ItemResponse>,
407) -> Vec<WorkflowResponse> {
408    // Only include items with kind == "workflow" — filter out stray items
409    // that agents may have created in the Workflows space.
410    let items: Vec<ItemResponse> = items.into_iter().filter(|i| i.kind == "workflow").collect();
411
412    if items.is_empty() {
413        return Vec::new();
414    }
415
416    let krefs: Vec<String> = items.iter().map(|i| i.kref.clone()).collect();
417
418    if let Ok(mut rev_map) = client.batch_get_revisions(&krefs, "published").await {
419        let missing: Vec<String> = krefs
420            .iter()
421            .filter(|k| !rev_map.contains_key(*k))
422            .cloned()
423            .collect();
424        let mut latest_map = if !missing.is_empty() {
425            client
426                .batch_get_revisions(&missing, "latest")
427                .await
428                .unwrap_or_default()
429        } else {
430            HashMap::new()
431        };
432
433        // Artifact-first: the `workflow.yaml` on disk is canonical. The inline
434        // `definition` metadata drifts for operator-authored revisions and is
435        // truncated by Kumiho's batch endpoint for large YAMLs, so we always
436        // prefer the artifact when it exists — same logic the single-revision
437        // endpoint uses.
438        prefer_artifact_definitions(client, &mut rev_map).await;
439        prefer_artifact_definitions(client, &mut latest_map).await;
440
441        return items
442            .iter()
443            .map(|item| {
444                let rev = rev_map
445                    .get(&item.kref)
446                    .or_else(|| latest_map.get(&item.kref));
447                to_workflow_response(item, rev)
448            })
449            .collect();
450    }
451
452    // Fallback: sequential
453    let mut workflows = Vec::with_capacity(items.len());
454    for item in &items {
455        let rev = client.get_published_or_latest(&item.kref).await.ok();
456        workflows.push(to_workflow_response(item, rev.as_ref()));
457    }
458    workflows
459}
460
461fn to_run_summary(item: &ItemResponse, rev: Option<&RevisionResponse>) -> WorkflowRunSummary {
462    let meta = rev.map(|r| &r.metadata);
463    let get = |key: &str| -> String { meta.and_then(|m| m.get(key)).cloned().unwrap_or_default() };
464
465    let run_id_meta = get("run_id");
466    WorkflowRunSummary {
467        kref: item.kref.clone(),
468        run_id: if run_id_meta.is_empty() {
469            item.item_name.clone()
470        } else {
471            run_id_meta
472        },
473        workflow_name: {
474            let wn = get("workflow_name");
475            if wn.is_empty() { get("workflow") } else { wn }
476        },
477        status: get("status"),
478        started_at: get("started_at"),
479        completed_at: get("completed_at"),
480        steps_completed: get("steps_completed"),
481        steps_total: get("steps_total"),
482        error: get("error"),
483        workflow_item_kref: get("workflow_item_kref"),
484        workflow_revision_kref: get("workflow_revision_kref"),
485    }
486}
487
488fn extract_steps_from_metadata(meta: &HashMap<String, String>) -> Vec<WorkflowStepDetail> {
489    // Skip known non-step metadata keys that happen to start with "step_"
490    const SKIP_KEYS: &[&str] = &["step_count", "steps_completed", "steps_total"];
491
492    let mut steps = Vec::new();
493    for (key, value) in meta {
494        if SKIP_KEYS.contains(&key.as_str()) {
495            continue;
496        }
497        if let Some(step_id) = key.strip_prefix("step_") {
498            // Value should be JSON object: {"status":"completed","output_preview":"...","agent_id":"..."}
499            // Legacy runs may have truncated JSON — fall back to regex extraction.
500            if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(value) {
501                // Only accept JSON objects (skip plain numbers/strings)
502                if !parsed.is_object() {
503                    continue;
504                }
505                let skills = parsed
506                    .get("skills")
507                    .and_then(|v| v.as_array())
508                    .map(|arr| {
509                        arr.iter()
510                            .filter_map(|s| s.as_str().map(|s| s.to_string()))
511                            .collect::<Vec<_>>()
512                    })
513                    .unwrap_or_default();
514                // Group chat transcript — stored as JSON string of array
515                let transcript = parsed
516                    .get("transcript")
517                    .and_then(|v| v.as_str())
518                    .and_then(|s| serde_json::from_str::<Vec<serde_json::Value>>(s).ok())
519                    .map(|arr| {
520                        arr.iter()
521                            .map(|entry| TranscriptEntry {
522                                speaker: entry
523                                    .get("speaker")
524                                    .and_then(|v| v.as_str())
525                                    .unwrap_or("?")
526                                    .to_string(),
527                                content: entry
528                                    .get("content")
529                                    .and_then(|v| v.as_str())
530                                    .unwrap_or("")
531                                    .to_string(),
532                                round: entry.get("round").and_then(|v| v.as_u64()).unwrap_or(0)
533                                    as u32,
534                            })
535                            .collect::<Vec<_>>()
536                    })
537                    .unwrap_or_default();
538                // Decode output_data for approval steps
539                let output_data = parsed.get("output_data").and_then(|v| {
540                    // output_data may be a JSON string or an embedded object
541                    let obj = if let Some(s) = v.as_str() {
542                        serde_json::from_str::<serde_json::Value>(s).ok()
543                    } else {
544                        Some(v.clone())
545                    };
546                    obj.map(|o| ApprovalOutputData {
547                        awaiting_approval: o.get("awaiting_approval").and_then(|v| v.as_bool()),
548                        approval_message: o
549                            .get("approval_message")
550                            .and_then(|v| v.as_str())
551                            .map(String::from),
552                        approve_keywords: o
553                            .get("approve_keywords")
554                            .and_then(|v| v.as_array())
555                            .map(|arr| {
556                                arr.iter()
557                                    .filter_map(|s| s.as_str().map(String::from))
558                                    .collect()
559                            })
560                            .unwrap_or_default(),
561                        reject_keywords: o
562                            .get("reject_keywords")
563                            .and_then(|v| v.as_array())
564                            .map(|arr| {
565                                arr.iter()
566                                    .filter_map(|s| s.as_str().map(String::from))
567                                    .collect()
568                            })
569                            .unwrap_or_default(),
570                    })
571                });
572                steps.push(WorkflowStepDetail {
573                    step_id: step_id.to_string(),
574                    status: parsed
575                        .get("status")
576                        .and_then(|v| v.as_str())
577                        .unwrap_or("unknown")
578                        .to_string(),
579                    agent_id: parsed
580                        .get("agent_id")
581                        .and_then(|v| v.as_str())
582                        .unwrap_or("")
583                        .to_string(),
584                    agent_type: parsed
585                        .get("agent_type")
586                        .and_then(|v| v.as_str())
587                        .unwrap_or("")
588                        .to_string(),
589                    role: parsed
590                        .get("role")
591                        .and_then(|v| v.as_str())
592                        .unwrap_or("")
593                        .to_string(),
594                    template_name: parsed
595                        .get("template_name")
596                        .and_then(|v| v.as_str())
597                        .unwrap_or("")
598                        .to_string(),
599                    output_preview: parsed
600                        .get("output_preview")
601                        .and_then(|v| v.as_str())
602                        .unwrap_or("")
603                        .to_string(),
604                    artifact_path: parsed
605                        .get("artifact_path")
606                        .and_then(|v| v.as_str())
607                        .unwrap_or("")
608                        .to_string(),
609                    skills,
610                    transcript,
611                    output_data,
612                });
613            } else if value.contains(r#""status""#) {
614                // Truncated JSON fallback: extract status with simple string search
615                let status = if let Some(start) = value.find(r#""status": ""#) {
616                    let rest = &value[start + 11..];
617                    rest.split('"').next().unwrap_or("unknown")
618                } else {
619                    "unknown"
620                };
621                steps.push(WorkflowStepDetail {
622                    step_id: step_id.to_string(),
623                    status: status.to_string(),
624                    agent_id: String::new(),
625                    agent_type: String::new(),
626                    role: String::new(),
627                    template_name: String::new(),
628                    output_preview: String::new(),
629                    artifact_path: String::new(),
630                    skills: Vec::new(),
631                    transcript: Vec::new(),
632                    output_data: None,
633                });
634            }
635        }
636    }
637    steps
638}
639
640fn to_run_detail(item: &ItemResponse, rev: Option<&RevisionResponse>) -> WorkflowRunDetail {
641    let summary = to_run_summary(item, rev);
642    let steps = rev
643        .map(|r| extract_steps_from_metadata(&r.metadata))
644        .unwrap_or_default();
645    WorkflowRunDetail { summary, steps }
646}
647
648// ── Builtin workflow discovery ──────────────────────────────────────────
649
650/// Default directory containing builtin workflow YAML files.
651const BUILTIN_WORKFLOWS_DIR: &str = ".construct/operator_mcp/workflow/builtins";
652
653/// Discover builtin workflow YAML files from `~/BUILTIN_WORKFLOWS_DIR`.
654///
655/// Returns a vec of `WorkflowResponse` entries with `source = "builtin"`.
656fn discover_builtin_workflows() -> Vec<WorkflowResponse> {
657    let home = directories::UserDirs::new()
658        .map(|u| u.home_dir().to_path_buf())
659        .unwrap_or_default();
660    let builtins_dir = home.join(BUILTIN_WORKFLOWS_DIR);
661    let Ok(entries) = std::fs::read_dir(&builtins_dir) else {
662        return Vec::new();
663    };
664
665    let mut workflows = Vec::new();
666    for entry in entries.flatten() {
667        let path = entry.path();
668        let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
669        if ext != "yaml" && ext != "yml" {
670            continue;
671        }
672        let Ok(content) = std::fs::read_to_string(&path) else {
673            continue;
674        };
675        // Extract name, description, tags from YAML frontmatter (lightweight parse)
676        let name = extract_yaml_field(&content, "name").unwrap_or_else(|| {
677            path.file_stem()
678                .unwrap_or_default()
679                .to_string_lossy()
680                .into_owned()
681        });
682        let description = extract_yaml_field(&content, "description").unwrap_or_default();
683        let version = extract_yaml_field(&content, "version").unwrap_or_else(|| "1.0".into());
684        let tags_str = extract_yaml_field(&content, "tags").unwrap_or_default();
685        let tags: Vec<String> = if tags_str.is_empty() {
686            Vec::new()
687        } else {
688            // Parse [tag1, tag2] format
689            tags_str
690                .trim_start_matches('[')
691                .trim_end_matches(']')
692                .split(',')
693                .map(|s| s.trim().trim_matches('"').trim_matches('\'').to_string())
694                .filter(|s| !s.is_empty())
695                .collect()
696        };
697        let steps = count_yaml_steps(&content);
698        let item_name = slugify(&name);
699
700        let triggers = extract_triggers(&content);
701        workflows.push(WorkflowResponse {
702            kref: format!("builtin://{item_name}"),
703            name,
704            item_name,
705            deprecated: false,
706            created_at: None,
707            description,
708            definition: content,
709            version,
710            tags,
711            steps,
712            revision_number: 0,
713            source: "builtin".to_string(),
714            triggers,
715        });
716    }
717    workflows
718}
719
720/// Extract a top-level scalar field from YAML content (lightweight, no full parser).
721fn extract_yaml_field(content: &str, field: &str) -> Option<String> {
722    for line in content.lines() {
723        let trimmed = line.trim();
724        if let Some(rest) = trimmed.strip_prefix(field) {
725            if let Some(value) = rest.strip_prefix(':') {
726                let v = value.trim();
727                // Strip quotes
728                let v = v.trim_matches('"').trim_matches('\'');
729                if !v.is_empty() {
730                    return Some(v.to_string());
731                }
732            }
733        }
734        // Stop at steps/inputs — only look at frontmatter
735        if trimmed == "steps:" || trimmed == "inputs:" {
736            break;
737        }
738    }
739    None
740}
741
742/// Extract trigger definitions from a YAML workflow definition (lightweight, no full parser).
743///
744/// Expects a `triggers:` top-level key containing a list of mappings with `on_kind`,
745/// optional `on_tag` (defaults to `"ready"`), and optional `on_name_pattern`.
746fn extract_triggers(content: &str) -> Vec<WorkflowTrigger> {
747    let mut triggers = Vec::new();
748    let mut in_triggers = false;
749    let mut current_kind = String::new();
750    let mut current_tag = String::new();
751    let mut current_pattern = String::new();
752
753    for line in content.lines() {
754        let trimmed = line.trim();
755        if trimmed == "triggers:" {
756            in_triggers = true;
757            continue;
758        }
759        if !in_triggers {
760            continue;
761        }
762        // A non-indented, non-empty, non-comment line means we left the triggers block
763        if !trimmed.is_empty()
764            && !trimmed.starts_with('-')
765            && !trimmed.starts_with('#')
766            && !line.starts_with(' ')
767            && !line.starts_with('\t')
768        {
769            break;
770        }
771        // New list item — flush previous if any
772        if trimmed.starts_with("- ") {
773            if !current_kind.is_empty() {
774                triggers.push(WorkflowTrigger {
775                    on_kind: std::mem::take(&mut current_kind),
776                    on_tag: if current_tag.is_empty() {
777                        "ready".to_string()
778                    } else {
779                        std::mem::take(&mut current_tag)
780                    },
781                    on_name_pattern: std::mem::take(&mut current_pattern),
782                });
783            }
784            // Parse inline key on the `- ` line (e.g. `- on_kind: model`)
785            let after_dash = trimmed.strip_prefix("- ").unwrap_or("");
786            if let Some((k, v)) = after_dash.split_once(':') {
787                let k = k.trim();
788                let v = v.trim().trim_matches('"').trim_matches('\'');
789                match k {
790                    "on_kind" => current_kind = v.to_string(),
791                    "on_tag" => current_tag = v.to_string(),
792                    "on_name_pattern" => current_pattern = v.to_string(),
793                    _ => {}
794                }
795            }
796            continue;
797        }
798        // Continuation key within a list item
799        if let Some((k, v)) = trimmed.split_once(':') {
800            let k = k.trim();
801            let v = v.trim().trim_matches('"').trim_matches('\'');
802            match k {
803                "on_kind" => current_kind = v.to_string(),
804                "on_tag" => current_tag = v.to_string(),
805                "on_name_pattern" => current_pattern = v.to_string(),
806                _ => {}
807            }
808        }
809    }
810    // Flush last trigger
811    if !current_kind.is_empty() {
812        triggers.push(WorkflowTrigger {
813            on_kind: current_kind,
814            on_tag: if current_tag.is_empty() {
815                "ready".to_string()
816            } else {
817                current_tag
818            },
819            on_name_pattern: current_pattern,
820        });
821    }
822    triggers
823}
824
825/// Extract cron trigger expressions from a workflow YAML definition (lightweight, no full parser).
826///
827/// Expects a `triggers:` top-level key containing list items with a `cron:` field and optional
828/// `timezone:` field.  Returns `Vec<(cron_expression, optional_timezone)>`.
829fn extract_cron_triggers(content: &str) -> Vec<(String, Option<String>)> {
830    let mut results = Vec::new();
831    let mut in_triggers = false;
832    let mut current_cron = String::new();
833    let mut current_tz: Option<String> = None;
834
835    for line in content.lines() {
836        let trimmed = line.trim();
837        if trimmed == "triggers:" {
838            in_triggers = true;
839            continue;
840        }
841        if !in_triggers {
842            continue;
843        }
844        // A non-indented, non-empty, non-comment line means we left the triggers block
845        if !trimmed.is_empty()
846            && !trimmed.starts_with('-')
847            && !trimmed.starts_with('#')
848            && !line.starts_with(' ')
849            && !line.starts_with('\t')
850        {
851            break;
852        }
853        // New list item — flush previous if any
854        if trimmed.starts_with("- ") {
855            if !current_cron.is_empty() {
856                results.push((std::mem::take(&mut current_cron), current_tz.take()));
857            }
858            // Parse inline key on the `- ` line (e.g. `- cron: "0 9 * * *"`)
859            let after_dash = trimmed.strip_prefix("- ").unwrap_or("");
860            if let Some((k, v)) = after_dash.split_once(':') {
861                let k = k.trim();
862                let v = v.trim().trim_matches('"').trim_matches('\'');
863                match k {
864                    "cron" if !v.is_empty() => current_cron = v.to_string(),
865                    "timezone" | "tz" if !v.is_empty() => current_tz = Some(v.to_string()),
866                    _ => {}
867                }
868            }
869            continue;
870        }
871        // Continuation key within a list item
872        if let Some((k, v)) = trimmed.split_once(':') {
873            let k = k.trim();
874            let v = v.trim().trim_matches('"').trim_matches('\'');
875            match k {
876                "cron" if !v.is_empty() => current_cron = v.to_string(),
877                "timezone" | "tz" if !v.is_empty() => current_tz = Some(v.to_string()),
878                _ => {}
879            }
880        }
881    }
882    // Flush last trigger
883    if !current_cron.is_empty() {
884        results.push((current_cron, current_tz));
885    }
886    results
887}
888
889/// Sync cron triggers for a single workflow to the cron scheduler.
890///
891/// Removes any existing cron jobs for this workflow and re-creates them from
892/// the triggers found in the current YAML definition.
893/// Write the workflow YAML to ~/.construct/workflows/ and register a Kumiho artifact.
894async fn persist_workflow_artifact(
895    client: &KumihoClient,
896    revision_kref: &str,
897    revision_number: i32,
898    workflow_name: &str,
899    definition: &str,
900) {
901    let home = directories::UserDirs::new()
902        .map(|u| u.home_dir().to_path_buf())
903        .unwrap_or_default();
904    let dir = home.join(".construct/workflows");
905    let _ = tokio::fs::create_dir_all(&dir).await;
906
907    let slug = slugify(workflow_name);
908    let file_path = dir.join(format!("{slug}.r{revision_number}.yaml"));
909    let location = format!("file://{}", file_path.display());
910
911    if let Err(e) = tokio::fs::write(&file_path, definition).await {
912        tracing::warn!("Failed to write workflow YAML for {workflow_name}: {e}");
913        return;
914    }
915
916    if let Err(e) = client
917        .create_artifact(revision_kref, "workflow.yaml", &location, HashMap::new())
918        .await
919    {
920        tracing::warn!("Failed to create artifact for workflow {workflow_name}: {e}");
921    } else {
922        tracing::info!("Persisted workflow artifact: {location}");
923    }
924}
925
926fn sync_cron_for_workflow(state: &AppState, workflow_name: &str, definition: &str) {
927    let cron_triggers = extract_cron_triggers(definition);
928    let config = state.config.lock();
929
930    // Remove existing cron jobs for this workflow first
931    if let Err(e) = crate::cron::remove_workflow_cron_jobs(&config, workflow_name) {
932        tracing::warn!("Failed to remove old cron jobs for workflow {workflow_name}: {e}");
933    }
934
935    if cron_triggers.is_empty() {
936        return;
937    }
938
939    let wf_crons: Vec<(String, String, Option<String>)> = cron_triggers
940        .into_iter()
941        .map(|(expr, tz)| (workflow_name.to_string(), expr, tz))
942        .collect();
943
944    if let Err(e) = crate::cron::sync_workflow_cron_jobs(&config, &wf_crons) {
945        tracing::warn!("Failed to sync cron triggers for workflow {workflow_name}: {e}");
946    }
947}
948
949/// Merge builtin workflows with Kumiho workflows.
950///
951/// - Builtins whose `item_name` matches a Kumiho item are marked `"builtin-modified"`.
952/// - Unmatched builtins are included as `"builtin"`.
953/// - Kumiho-only workflows remain `"custom"`.
954fn merge_with_builtins(mut kumiho_workflows: Vec<WorkflowResponse>) -> Vec<WorkflowResponse> {
955    let builtins = discover_builtin_workflows();
956    if builtins.is_empty() {
957        return kumiho_workflows;
958    }
959
960    let builtin_names: std::collections::HashSet<String> =
961        builtins.iter().map(|b| b.item_name.clone()).collect();
962
963    // Tag Kumiho workflows that override a builtin
964    for wf in &mut kumiho_workflows {
965        if builtin_names.contains(&wf.item_name) {
966            wf.source = "builtin-modified".to_string();
967        }
968    }
969
970    // Add builtins that have no Kumiho override
971    let kumiho_names: std::collections::HashSet<String> = kumiho_workflows
972        .iter()
973        .map(|w| w.item_name.clone())
974        .collect();
975    for builtin in builtins {
976        if !kumiho_names.contains(&builtin.item_name) {
977            kumiho_workflows.push(builtin);
978        }
979    }
980
981    kumiho_workflows
982}
983
984// ── Definition Handlers ─────────────────────────────────────────────────
985
986/// GET /api/workflows
987pub async fn handle_list_workflows(
988    State(state): State<AppState>,
989    headers: HeaderMap,
990    Query(query): Query<WorkflowListQuery>,
991) -> impl IntoResponse {
992    if let Err(e) = require_auth(&state, &headers) {
993        return e.into_response();
994    }
995
996    let client = build_kumiho_client(&state);
997    let project = workflow_project(&state);
998    let space_path = workflow_space_path(&state);
999
1000    // Return cached result if available (before making API call)
1001    if query.q.is_none() {
1002        if let Some(cached) = get_cached(query.include_deprecated) {
1003            return Json(serde_json::json!({ "workflows": cached })).into_response();
1004        }
1005    }
1006
1007    let items_result = if let Some(ref q) = query.q {
1008        client
1009            .search_items(q, &project, "workflow", query.include_deprecated)
1010            .await
1011            .map(|results| results.into_iter().map(|sr| sr.item).collect::<Vec<_>>())
1012    } else {
1013        client
1014            .list_items(&space_path, query.include_deprecated)
1015            .await
1016    };
1017
1018    match items_result {
1019        Ok(items) => {
1020            let workflows = merge_with_builtins(enrich_items(&client, items).await);
1021            if query.q.is_none() {
1022                set_cached(&workflows, query.include_deprecated);
1023            }
1024            Json(serde_json::json!({ "workflows": workflows })).into_response()
1025        }
1026        Err(ref e) if matches!(e, KumihoError::Api { status: 404, .. }) => {
1027            let _ = client.ensure_project(&project).await;
1028            let _ = client.ensure_space(&project, WORKFLOW_SPACE_NAME).await;
1029            let workflows = merge_with_builtins(Vec::new());
1030            Json(serde_json::json!({ "workflows": workflows })).into_response()
1031        }
1032        Err(e) => {
1033            // On API error, try to return stale cache rather than an error
1034            if query.q.is_none() {
1035                let lock = WORKFLOW_CACHE.get_or_init(|| Mutex::new(None));
1036                let cache = lock.lock();
1037                if let Some(ref c) = *cache {
1038                    tracing::warn!("Workflows list failed, returning stale cache: {e}");
1039                    return Json(serde_json::json!({ "workflows": c.workflows })).into_response();
1040                }
1041            }
1042            kumiho_err(e).into_response()
1043        }
1044    }
1045}
1046
1047/// Result of calling the operator's `validate_workflow` MCP tool.
1048///
1049/// Mirrors the Python-side `ValidationResult.to_dict()` shape:
1050/// `{ valid: bool, errors: [...], warnings: [...] }`. Unwraps the MCP
1051/// `content[0].text` envelope.
1052#[derive(Debug)]
1053struct ValidationOutcome {
1054    valid: bool,
1055    errors: Vec<serde_json::Value>,
1056    warnings: Vec<serde_json::Value>,
1057}
1058
1059/// Call the operator's `validate_workflow` tool via MCP. Returns a structured
1060/// outcome. Any transport/parse failure is returned as `Err(String)` — callers
1061/// should fail-open (allow the operation) rather than block on infra errors.
1062async fn validate_via_operator(
1063    state: &AppState,
1064    args: serde_json::Map<String, serde_json::Value>,
1065) -> Result<ValidationOutcome, String> {
1066    let tool_name = format!(
1067        "{}__validate_workflow",
1068        crate::agent::operator::OPERATOR_SERVER_NAME
1069    );
1070
1071    let registry = state
1072        .mcp_registry
1073        .as_ref()
1074        .ok_or_else(|| "MCP registry not available — operator not connected".to_string())?;
1075
1076    let fut = registry.call_tool(&tool_name, serde_json::Value::Object(args));
1077    let result_str = match tokio::time::timeout(std::time::Duration::from_secs(15), fut).await {
1078        Ok(Ok(s)) => s,
1079        Ok(Err(e)) => return Err(format!("operator validate_workflow failed: {e:#}")),
1080        Err(_) => return Err("operator validate_workflow timed out (15s)".to_string()),
1081    };
1082
1083    // Outer MCP envelope: { "content": [{"type":"text","text":"<json>"}], ... }
1084    let outer: serde_json::Value = serde_json::from_str(&result_str)
1085        .map_err(|e| format!("validate_workflow: outer JSON parse failed: {e}"))?;
1086
1087    let inner_text = outer
1088        .get("content")
1089        .and_then(|c| c.get(0))
1090        .and_then(|c0| c0.get("text"))
1091        .and_then(|t| t.as_str())
1092        .ok_or_else(|| "validate_workflow: missing content[0].text".to_string())?;
1093
1094    let inner: serde_json::Value = serde_json::from_str(inner_text)
1095        .map_err(|e| format!("validate_workflow: inner JSON parse failed: {e}"))?;
1096
1097    let valid = inner
1098        .get("valid")
1099        .and_then(|v| v.as_bool())
1100        .ok_or_else(|| "validate_workflow: missing `valid` field".to_string())?;
1101    let errors = inner
1102        .get("errors")
1103        .and_then(|v| v.as_array())
1104        .cloned()
1105        .unwrap_or_default();
1106    let warnings = inner
1107        .get("warnings")
1108        .and_then(|v| v.as_array())
1109        .cloned()
1110        .unwrap_or_default();
1111
1112    Ok(ValidationOutcome {
1113        valid,
1114        errors,
1115        warnings,
1116    })
1117}
1118
1119/// Build the 400 response body for a failed validation.
1120fn validation_error_response(
1121    outcome: &ValidationOutcome,
1122    context: &str,
1123) -> (StatusCode, Json<serde_json::Value>) {
1124    (
1125        StatusCode::BAD_REQUEST,
1126        Json(serde_json::json!({
1127            "error": format!("Workflow validation failed: {context}"),
1128            "valid": false,
1129            "errors": outcome.errors,
1130            "warnings": outcome.warnings,
1131        })),
1132    )
1133}
1134
1135/// POST /api/workflows
1136pub async fn handle_create_workflow(
1137    State(state): State<AppState>,
1138    headers: HeaderMap,
1139    Json(body): Json<CreateWorkflowBody>,
1140) -> impl IntoResponse {
1141    if let Err(e) = require_auth(&state, &headers) {
1142        return e.into_response();
1143    }
1144
1145    // Validate YAML before persisting — reject syntactically or schematically broken
1146    // definitions so they never reach storage (and thus never silently fail at dispatch).
1147    let mut v_args = serde_json::Map::new();
1148    v_args.insert(
1149        "workflow_yaml".to_string(),
1150        serde_json::Value::String(body.definition.clone()),
1151    );
1152    match validate_via_operator(&state, v_args).await {
1153        Ok(outcome) if !outcome.valid => {
1154            return validation_error_response(&outcome, "cannot save invalid workflow")
1155                .into_response();
1156        }
1157        Ok(_) => {}
1158        Err(e) => {
1159            tracing::warn!("create_workflow: validation skipped (infra error): {e}");
1160        }
1161    }
1162
1163    let client = build_kumiho_client(&state);
1164    let project = workflow_project(&state);
1165    let space_path = workflow_space_path(&state);
1166
1167    if let Err(e) = client.ensure_project(&project).await {
1168        return kumiho_err(e).into_response();
1169    }
1170    if let Err(e) = client.ensure_space(&project, WORKFLOW_SPACE_NAME).await {
1171        return kumiho_err(e).into_response();
1172    }
1173
1174    let slug = slugify(&body.name);
1175    let item = match client
1176        .create_item(&space_path, &slug, "workflow", HashMap::new())
1177        .await
1178    {
1179        Ok(item) => item,
1180        Err(e) => return kumiho_err(e).into_response(),
1181    };
1182
1183    let metadata = workflow_metadata(&body);
1184    let rev = match client.create_revision(&item.kref, metadata).await {
1185        Ok(rev) => rev,
1186        Err(e) => return kumiho_err(e).into_response(),
1187    };
1188
1189    // Persist YAML to disk and register artifact BEFORE publishing (published revisions are immutable)
1190    persist_workflow_artifact(&client, &rev.kref, rev.number, &body.name, &body.definition).await;
1191    let _ = client.tag_revision(&rev.kref, "published").await;
1192
1193    invalidate_cache();
1194    sync_cron_for_workflow(&state, &body.name, &body.definition);
1195
1196    let workflow = to_workflow_response(&item, Some(&rev));
1197    (
1198        StatusCode::CREATED,
1199        Json(serde_json::json!({ "workflow": workflow })),
1200    )
1201        .into_response()
1202}
1203
1204/// PUT /api/workflows/{*kref}
1205pub async fn handle_update_workflow(
1206    State(state): State<AppState>,
1207    headers: HeaderMap,
1208    Path(kref): Path<String>,
1209    Json(body): Json<CreateWorkflowBody>,
1210) -> impl IntoResponse {
1211    if let Err(e) = require_auth(&state, &headers) {
1212        return e.into_response();
1213    }
1214
1215    // Validate YAML before persisting the new revision.
1216    let mut v_args = serde_json::Map::new();
1217    v_args.insert(
1218        "workflow_yaml".to_string(),
1219        serde_json::Value::String(body.definition.clone()),
1220    );
1221    match validate_via_operator(&state, v_args).await {
1222        Ok(outcome) if !outcome.valid => {
1223            return validation_error_response(&outcome, "cannot save invalid workflow")
1224                .into_response();
1225        }
1226        Ok(_) => {}
1227        Err(e) => {
1228            tracing::warn!("update_workflow: validation skipped (infra error): {e}");
1229        }
1230    }
1231
1232    let kref = format!("kref://{kref}");
1233    let client = build_kumiho_client(&state);
1234
1235    let metadata = workflow_metadata(&body);
1236    let rev = match client.create_revision(&kref, metadata).await {
1237        Ok(rev) => rev,
1238        Err(e) => return kumiho_err(e).into_response(),
1239    };
1240
1241    // Persist YAML to disk and register artifact BEFORE publishing (published revisions are immutable)
1242    persist_workflow_artifact(&client, &rev.kref, rev.number, &body.name, &body.definition).await;
1243    let _ = client.tag_revision(&rev.kref, "published").await;
1244
1245    let items = match client.list_items(&workflow_space_path(&state), true).await {
1246        Ok(items) => items,
1247        Err(e) => return kumiho_err(e).into_response(),
1248    };
1249
1250    invalidate_cache();
1251    sync_cron_for_workflow(&state, &body.name, &body.definition);
1252
1253    let item = items.iter().find(|i| i.kref == kref);
1254    match item {
1255        Some(item) => {
1256            let workflow = to_workflow_response(item, Some(&rev));
1257            Json(serde_json::json!({ "workflow": workflow })).into_response()
1258        }
1259        None => {
1260            let fallback = ItemResponse {
1261                kref: kref.clone(),
1262                name: body.name.clone(),
1263                item_name: body.name.clone(),
1264                kind: "workflow".to_string(),
1265                deprecated: false,
1266                created_at: None,
1267                metadata: HashMap::new(),
1268            };
1269            let workflow = to_workflow_response(&fallback, Some(&rev));
1270            Json(serde_json::json!({ "workflow": workflow })).into_response()
1271        }
1272    }
1273}
1274
1275/// POST /api/workflows/deprecate
1276pub async fn handle_deprecate_workflow(
1277    State(state): State<AppState>,
1278    headers: HeaderMap,
1279    Json(body): Json<DeprecateBody>,
1280) -> impl IntoResponse {
1281    if let Err(e) = require_auth(&state, &headers) {
1282        return e.into_response();
1283    }
1284
1285    let kref = body.kref.clone();
1286    let client = build_kumiho_client(&state);
1287
1288    match client.deprecate_item(&kref, body.deprecated).await {
1289        Ok(item) => {
1290            invalidate_cache();
1291            let rev = client.get_published_or_latest(&kref).await.ok();
1292
1293            // Sync cron triggers: remove when deprecating, re-add when restoring.
1294            if body.deprecated {
1295                // Remove cron jobs for this workflow
1296                if let Some(item_segment) = kref.split('/').last() {
1297                    let workflow_name = item_segment
1298                        .rsplit_once('.')
1299                        .map(|(name, _kind)| name)
1300                        .unwrap_or(item_segment);
1301                    let config = state.config.lock();
1302                    if let Err(e) = crate::cron::remove_workflow_cron_jobs(&config, workflow_name) {
1303                        tracing::warn!("Failed to remove cron jobs for deprecated workflow: {e}");
1304                    }
1305                }
1306            } else if let Some(ref rev) = rev {
1307                // Restoring — re-sync cron triggers from the definition
1308                if let Some(definition) = rev.metadata.get("definition") {
1309                    sync_cron_for_workflow(&state, &item.item_name, definition);
1310                }
1311            }
1312
1313            let workflow = to_workflow_response(&item, rev.as_ref());
1314            Json(serde_json::json!({ "workflow": workflow })).into_response()
1315        }
1316        Err(e) => kumiho_err(e).into_response(),
1317    }
1318}
1319
1320/// DELETE /api/workflows/{*kref}
1321pub async fn handle_delete_workflow(
1322    State(state): State<AppState>,
1323    headers: HeaderMap,
1324    Path(kref): Path<String>,
1325) -> impl IntoResponse {
1326    if let Err(e) = require_auth(&state, &headers) {
1327        return e.into_response();
1328    }
1329
1330    let kref = format!("kref://{kref}");
1331    let client = build_kumiho_client(&state);
1332
1333    match client.delete_item(&kref).await {
1334        Ok(()) => {
1335            invalidate_cache();
1336
1337            // Remove associated cron jobs.  Extract the item_name from the kref
1338            // (the last path segment minus the `.workflow` kind suffix) and use
1339            // it as the workflow name for cron cleanup.
1340            if let Some(item_segment) = kref.split('/').last() {
1341                let workflow_name = item_segment
1342                    .rsplit_once('.')
1343                    .map(|(name, _kind)| name)
1344                    .unwrap_or(item_segment);
1345                let config = state.config.lock();
1346                if let Err(e) = crate::cron::remove_workflow_cron_jobs(&config, workflow_name) {
1347                    tracing::warn!("Failed to remove cron jobs for deleted workflow: {e}");
1348                }
1349            }
1350
1351            StatusCode::NO_CONTENT.into_response()
1352        }
1353        Err(e) => kumiho_err(e).into_response(),
1354    }
1355}
1356
1357/// POST /api/workflows/run/{name}
1358///
1359/// Triggers a workflow run request.  Creates a `workflow-run-request` item in
1360/// Kumiho so the scheduler or operator can pick it up.
1361pub async fn handle_run_workflow(
1362    State(state): State<AppState>,
1363    headers: HeaderMap,
1364    Path(name): Path<String>,
1365    body: Option<Json<RunWorkflowBody>>,
1366) -> impl IntoResponse {
1367    if let Err(e) = require_auth(&state, &headers) {
1368        return e.into_response();
1369    }
1370
1371    let inputs = body
1372        .as_ref()
1373        .map(|b| b.inputs.clone())
1374        .unwrap_or(serde_json::Value::Object(Default::default()));
1375    let cwd = body.as_ref().and_then(|b| b.cwd.clone());
1376
1377    // Pre-dispatch validation: resolve the named workflow (from builtins/Kumiho)
1378    // and run the schema validator. Blocks silent failures where a malformed
1379    // definition gets enqueued but the async runner can't parse it.
1380    let mut v_args = serde_json::Map::new();
1381    v_args.insert(
1382        "workflow".to_string(),
1383        serde_json::Value::String(name.clone()),
1384    );
1385    if let Some(ref c) = cwd {
1386        v_args.insert("cwd".to_string(), serde_json::Value::String(c.clone()));
1387    }
1388    match validate_via_operator(&state, v_args).await {
1389        Ok(outcome) if !outcome.valid => {
1390            return validation_error_response(&outcome, "cannot dispatch invalid workflow")
1391                .into_response();
1392        }
1393        Ok(_) => {}
1394        Err(e) => {
1395            tracing::warn!("run_workflow: validation skipped (infra error): {e}");
1396        }
1397    }
1398
1399    let run_id = uuid::Uuid::new_v4().to_string();
1400    let now = chrono::Utc::now().to_rfc3339();
1401
1402    let client = build_kumiho_client(&state);
1403    let project = workflow_project(&state);
1404    let requests_space_path = workflow_run_requests_space_path(&state);
1405
1406    // Ensure the WorkflowRunRequests space exists
1407    let _ = client.ensure_project(&project).await;
1408    let _ = client
1409        .ensure_space(&project, WORKFLOW_RUN_REQUESTS_SPACE_NAME)
1410        .await;
1411
1412    let mut metadata = HashMap::new();
1413    metadata.insert("workflow_name".to_string(), name.clone());
1414    metadata.insert("run_id".to_string(), run_id.clone());
1415    metadata.insert("inputs".to_string(), inputs.to_string());
1416    metadata.insert("cwd".to_string(), cwd.unwrap_or_default());
1417    metadata.insert("trigger_source".to_string(), "api".to_string());
1418    metadata.insert("requested_at".to_string(), now);
1419
1420    let item_name = format!("run-{}", &run_id[..run_id.len().min(12)]);
1421
1422    match client
1423        .create_item(
1424            &requests_space_path,
1425            &item_name,
1426            "workflow-run-request",
1427            metadata.clone(),
1428        )
1429        .await
1430    {
1431        Ok(item) => {
1432            if let Ok(rev) = client.create_revision(&item.kref, metadata).await {
1433                let _ = client.tag_revision(&rev.kref, "pending").await;
1434            }
1435            (
1436                StatusCode::OK,
1437                Json(serde_json::json!({
1438                    "run_id": run_id,
1439                    "workflow": name,
1440                    "status": "pending",
1441                })),
1442            )
1443                .into_response()
1444        }
1445        Err(e) => {
1446            tracing::warn!("Failed to create workflow run request: {e}");
1447            (
1448                StatusCode::INTERNAL_SERVER_ERROR,
1449                Json(serde_json::json!({
1450                    "error": format!("Failed to create run request: {e}")
1451                })),
1452            )
1453                .into_response()
1454        }
1455    }
1456}
1457
1458/// GET /api/workflows/revisions/{*kref}
1459///
1460/// Fetches a workflow definition pinned to a specific Kumiho revision kref
1461/// (e.g. `kref://Construct/Workflows/my-wf.workflow?r=3`). Used by the dashboard
1462/// DAG viewer to render the exact YAML a run executed, independent of whatever
1463/// is currently tagged `published` on the workflow item.
1464pub async fn handle_get_workflow_by_revision(
1465    State(state): State<AppState>,
1466    headers: HeaderMap,
1467    Path(kref): Path<String>,
1468) -> impl IntoResponse {
1469    if let Err(e) = require_auth(&state, &headers) {
1470        return e.into_response();
1471    }
1472
1473    let revision_kref = if kref.starts_with("kref://") {
1474        kref.clone()
1475    } else {
1476        format!("kref://{kref}")
1477    };
1478
1479    let client = build_kumiho_client(&state);
1480
1481    let mut rev = match client.get_revision(&revision_kref).await {
1482        Ok(r) => r,
1483        Err(e) => return kumiho_err(e).into_response(),
1484    };
1485
1486    // Canonical source for a pinned revision's YAML is the `workflow.yaml`
1487    // artifact on disk. The inline `definition` metadata key is a legacy
1488    // gateway-authored field and drifts from the artifact for operator-
1489    // authored revisions, so we always prefer the artifact and only fall
1490    // back to metadata when no artifact exists.
1491    if let Ok(artifact) = client
1492        .get_artifact_by_name(&rev.kref, "workflow.yaml")
1493        .await
1494    {
1495        let path = artifact
1496            .location
1497            .strip_prefix("file://")
1498            .unwrap_or(&artifact.location);
1499        if let Ok(yaml) = tokio::fs::read_to_string(path).await {
1500            rev.metadata.insert("definition".to_string(), yaml);
1501        }
1502    }
1503
1504    // Derive a minimal item from the revision's item_kref. The DAG viewer only
1505    // consumes `definition` (YAML) and `revision_number` from the response.
1506    let item_name = rev
1507        .item_kref
1508        .rsplit('/')
1509        .next()
1510        .map(|seg| {
1511            seg.rsplit_once('.')
1512                .map(|(n, _)| n)
1513                .unwrap_or(seg)
1514                .to_string()
1515        })
1516        .unwrap_or_default();
1517
1518    let item = ItemResponse {
1519        kref: rev.item_kref.clone(),
1520        name: item_name.clone(),
1521        item_name,
1522        kind: "workflow".to_string(),
1523        deprecated: false,
1524        created_at: rev.created_at.clone(),
1525        metadata: HashMap::new(),
1526    };
1527
1528    let workflow = to_workflow_response(&item, Some(&rev));
1529    Json(serde_json::json!({ "workflow": workflow })).into_response()
1530}
1531
1532// ── Run Handlers ────────────────────────────────────────────────────────
1533
1534/// GET /api/workflows/runs
1535pub async fn handle_list_workflow_runs(
1536    State(state): State<AppState>,
1537    headers: HeaderMap,
1538    Query(query): Query<WorkflowRunsQuery>,
1539) -> impl IntoResponse {
1540    if let Err(e) = require_auth(&state, &headers) {
1541        return e.into_response();
1542    }
1543
1544    let client = build_kumiho_client(&state);
1545    let project = workflow_project(&state);
1546    let runs_space = workflow_runs_space_path(&state);
1547
1548    match client.list_items(&runs_space, false).await {
1549        Ok(mut items) => {
1550            // Only include workflow_run kind items
1551            items.retain(|i| i.kind == "workflow_run");
1552
1553            if let Some(ref wf_name) = query.workflow {
1554                items.retain(|item| {
1555                    item.metadata
1556                        .get("workflow_name")
1557                        .or_else(|| item.metadata.get("workflow"))
1558                        .map(|n| n == wf_name)
1559                        .unwrap_or(false)
1560                });
1561            }
1562
1563            items.sort_by(|a, b| {
1564                let a_time = a.created_at.as_deref().unwrap_or("");
1565                let b_time = b.created_at.as_deref().unwrap_or("");
1566                b_time.cmp(a_time)
1567            });
1568            items.truncate(query.limit);
1569
1570            let krefs: Vec<String> = items.iter().map(|i| i.kref.clone()).collect();
1571            let rev_map = client
1572                .batch_get_revisions(&krefs, "latest")
1573                .await
1574                .unwrap_or_default();
1575
1576            let runs: Vec<WorkflowRunSummary> = items
1577                .iter()
1578                .map(|item| to_run_summary(item, rev_map.get(&item.kref)))
1579                .collect();
1580
1581            Json(serde_json::json!({ "runs": runs, "count": runs.len() })).into_response()
1582        }
1583        Err(ref e) if matches!(e, KumihoError::Api { status: 404, .. }) => {
1584            let _ = client.ensure_project(&project).await;
1585            let _ = client
1586                .ensure_space(&project, WORKFLOW_RUNS_SPACE_NAME)
1587                .await;
1588            Json(serde_json::json!({ "runs": [], "count": 0 })).into_response()
1589        }
1590        Err(e) => {
1591            let msg = format!("Failed to fetch workflow runs: {e}");
1592            (
1593                StatusCode::SERVICE_UNAVAILABLE,
1594                Json(serde_json::json!({ "error": msg })),
1595            )
1596                .into_response()
1597        }
1598    }
1599}
1600
1601/// GET /api/workflows/runs/{run_id}
1602pub async fn handle_get_workflow_run(
1603    State(state): State<AppState>,
1604    headers: HeaderMap,
1605    Path(run_id): Path<String>,
1606) -> impl IntoResponse {
1607    if let Err(e) = require_auth(&state, &headers) {
1608        return e.into_response();
1609    }
1610
1611    let client = build_kumiho_client(&state);
1612    let project = workflow_project(&state);
1613    let runs_space = workflow_runs_space_path(&state);
1614
1615    // The item name format is "{workflow_name}-{run_id[:12]}", so the first
1616    // 12 characters of the run_id are always present in the item name.
1617    let run_id_prefix = &run_id[..run_id.len().min(12)];
1618
1619    // Strategy 1 (most reliable): filter items by name containing the run_id
1620    // prefix.  This avoids fulltext-search indexing lag and does not depend
1621    // on item metadata being returned in the list response.
1622    if let Ok(items) = client
1623        .list_items_filtered(&runs_space, run_id_prefix, false)
1624        .await
1625    {
1626        // Narrow to workflow_run kind items whose name actually contains the prefix
1627        let run_id_lower = run_id.to_lowercase();
1628        let prefix_lower = run_id_lower[..run_id_lower.len().min(12)].to_string();
1629        if let Some(item) = items.iter().find(|i| {
1630            i.kind == "workflow_run" && i.item_name.to_lowercase().contains(&prefix_lower)
1631        }) {
1632            let rev = client.get_latest_revision(&item.kref).await.ok();
1633            let detail = to_run_detail(item, rev.as_ref());
1634            return Json(serde_json::json!({ "run": detail })).into_response();
1635        }
1636    }
1637
1638    // Strategy 2: full-text search by run_id (may find it if indexed in item
1639    // metadata or if the run_id appears in the item name).
1640    if let Ok(results) = client
1641        .search_items(&run_id, &project, "workflow_run", false)
1642        .await
1643    {
1644        if let Some(sr) = results.first() {
1645            let rev = client.get_latest_revision(&sr.item.kref).await.ok();
1646            let detail = to_run_detail(&sr.item, rev.as_ref());
1647            return Json(serde_json::json!({ "run": detail })).into_response();
1648        }
1649    }
1650
1651    // Strategy 3: broad list + metadata/name match as last resort
1652    match client.list_items(&runs_space, false).await {
1653        Ok(items) => {
1654            let run_id_lower = run_id.to_lowercase();
1655            let found = items.iter().find(|item| {
1656                if item.kind != "workflow_run" {
1657                    return false;
1658                }
1659                // Match by metadata run_id (if metadata is returned)
1660                if let Some(meta_run_id) = item.metadata.get("run_id") {
1661                    if meta_run_id == &run_id {
1662                        return true;
1663                    }
1664                }
1665                // Match by item_name containing the run_id prefix (first 12 chars)
1666                let prefix = &run_id_lower[..run_id_lower.len().min(12)];
1667                item.item_name.to_lowercase().contains(prefix)
1668            });
1669
1670            match found {
1671                Some(item) => {
1672                    let rev = client.get_latest_revision(&item.kref).await.ok();
1673                    let detail = to_run_detail(item, rev.as_ref());
1674                    Json(serde_json::json!({ "run": detail })).into_response()
1675                }
1676                None => (
1677                    StatusCode::NOT_FOUND,
1678                    Json(serde_json::json!({ "error": format!("Run '{run_id}' not found") })),
1679                )
1680                    .into_response(),
1681            }
1682        }
1683        Err(e) => {
1684            let msg = format!("Kumiho error looking up run '{run_id}': {e}");
1685            tracing::warn!("{msg}");
1686            (
1687                StatusCode::SERVICE_UNAVAILABLE,
1688                Json(serde_json::json!({ "error": msg })),
1689            )
1690                .into_response()
1691        }
1692    }
1693}
1694
1695/// DELETE /api/workflows/runs/{run_id}
1696///
1697/// Deletes a workflow run from the WorkflowRuns space.  Finds the item by
1698/// run_id prefix matching (same strategy as the GET handler) then calls
1699/// `delete_item` on the resolved kref.
1700pub async fn handle_delete_workflow_run(
1701    State(state): State<AppState>,
1702    headers: HeaderMap,
1703    Path(run_id): Path<String>,
1704) -> impl IntoResponse {
1705    if let Err(e) = require_auth(&state, &headers) {
1706        return e.into_response();
1707    }
1708
1709    let client = build_kumiho_client(&state);
1710    let runs_space = workflow_runs_space_path(&state);
1711
1712    // Resolve the run item — reuse the same prefix-matching logic as the GET
1713    let run_id_prefix = &run_id[..run_id.len().min(12)];
1714
1715    let kref = if let Ok(items) = client
1716        .list_items_filtered(&runs_space, run_id_prefix, false)
1717        .await
1718    {
1719        let run_id_lower = run_id.to_lowercase();
1720        let prefix_lower = run_id_lower[..run_id_lower.len().min(12)].to_string();
1721        items
1722            .iter()
1723            .find(|i| {
1724                i.kind == "workflow_run" && i.item_name.to_lowercase().contains(&prefix_lower)
1725            })
1726            .map(|i| i.kref.clone())
1727    } else {
1728        None
1729    };
1730
1731    let kref = match kref {
1732        Some(k) => k,
1733        None => {
1734            return (
1735                StatusCode::NOT_FOUND,
1736                Json(serde_json::json!({ "error": format!("Run '{run_id}' not found") })),
1737            )
1738                .into_response();
1739        }
1740    };
1741
1742    match client.delete_item(&kref).await {
1743        Ok(()) => {
1744            cleanup_local_run_files(&run_id).await;
1745            StatusCode::NO_CONTENT.into_response()
1746        }
1747        Err(e) => {
1748            let msg = format!("Failed to delete run '{run_id}': {e}");
1749            tracing::warn!("{msg}");
1750            kumiho_err(e).into_response()
1751        }
1752    }
1753}
1754
1755/// Best-effort cleanup of on-disk run state after a successful Kumiho hard delete.
1756/// Removes the checkpoint at `~/.construct/workflow_checkpoints/{run_id}.json` and
1757/// any artifacts directory at `~/.construct/artifacts/<workflow>/{run_id}/`. Since
1758/// the workflow name isn't carried into this handler, we scan the artifacts root
1759/// for any subdirectory containing a matching run_id directory. Failures are logged
1760/// but do not affect the API response — the authoritative delete already succeeded.
1761async fn cleanup_local_run_files(run_id: &str) {
1762    let Some(user_dirs) = directories::UserDirs::new() else {
1763        return;
1764    };
1765    let home = user_dirs.home_dir().to_path_buf();
1766
1767    let checkpoint = home.join(format!(".construct/workflow_checkpoints/{run_id}.json"));
1768    if let Err(e) = tokio::fs::remove_file(&checkpoint).await {
1769        if e.kind() != std::io::ErrorKind::NotFound {
1770            tracing::warn!("Failed to remove checkpoint {}: {e}", checkpoint.display());
1771        }
1772    }
1773
1774    let artifacts_root = home.join(".construct/artifacts");
1775    let mut entries = match tokio::fs::read_dir(&artifacts_root).await {
1776        Ok(e) => e,
1777        Err(_) => return,
1778    };
1779    while let Ok(Some(entry)) = entries.next_entry().await {
1780        let candidate = entry.path().join(run_id);
1781        if tokio::fs::metadata(&candidate).await.is_ok() {
1782            if let Err(e) = tokio::fs::remove_dir_all(&candidate).await {
1783                tracing::warn!(
1784                    "Failed to remove artifacts dir {}: {e}",
1785                    candidate.display()
1786                );
1787            }
1788        }
1789    }
1790}
1791
1792/// POST /api/workflows/runs/{run_id}/approve
1793///
1794/// Body: { "approved": bool, "feedback": string (optional) }
1795///
1796/// Approves or rejects a paused workflow step. Atomically claims the approval
1797/// from the registry to prevent race conditions with Discord.
1798pub async fn handle_approve_workflow_run(
1799    State(state): State<AppState>,
1800    headers: HeaderMap,
1801    Path(run_id): Path<String>,
1802    Json(body): Json<ApproveWorkflowBody>,
1803) -> impl IntoResponse {
1804    if let Err(e) = require_auth(&state, &headers) {
1805        return e.into_response();
1806    }
1807
1808    let approved = body.approved;
1809    let feedback = body.feedback.unwrap_or_default();
1810
1811    // Atomically claim the approval from the registry to prevent races with Discord.
1812    // If None returned, the registry may have been lost (gateway restart) — fall through
1813    // and call resume_workflow directly; the operator validates paused state itself.
1814    let claimed = state.approval_registry.try_claim(&run_id);
1815    let cwd = claimed
1816        .as_ref()
1817        .map(|a| a.cwd.clone())
1818        .unwrap_or_else(|| "/tmp".to_string());
1819
1820    if claimed.is_none() {
1821        tracing::info!(
1822            "approve_workflow_run: no registry entry for run_id={run_id} (gateway restart?), \
1823             calling resume_workflow directly"
1824        );
1825    }
1826
1827    // Call the operator MCP tool `resume_workflow`
1828    let tool_name = format!(
1829        "{}__resume_workflow",
1830        crate::agent::operator::OPERATOR_SERVER_NAME
1831    );
1832    let mut tool_args = serde_json::Map::new();
1833    tool_args.insert(
1834        "run_id".to_string(),
1835        serde_json::Value::String(run_id.clone()),
1836    );
1837    tool_args.insert("approved".to_string(), serde_json::Value::Bool(approved));
1838    tool_args.insert(
1839        "response".to_string(),
1840        serde_json::Value::String(feedback.clone()),
1841    );
1842    tool_args.insert("cwd".to_string(), serde_json::Value::String(cwd));
1843
1844    let mcp_result = if let Some(ref registry) = state.mcp_registry {
1845        let mcp_future = registry.call_tool(&tool_name, serde_json::Value::Object(tool_args));
1846        match tokio::time::timeout(std::time::Duration::from_secs(30), mcp_future).await {
1847            Ok(Ok(result_str)) => Ok(result_str),
1848            Ok(Err(e)) => Err(format!("operator tool call failed: {e:#}")),
1849            Err(_) => Err("operator tool call timed out (30s)".to_string()),
1850        }
1851    } else {
1852        Err("MCP registry not available — operator not connected".to_string())
1853    };
1854
1855    match mcp_result {
1856        Ok(_) => {
1857            // Broadcast a human_approval_resolved SSE event so connected dashboards
1858            // can update their UI immediately without waiting for the next REST poll.
1859            let _ = state.event_tx.send(serde_json::json!({
1860                "type": "human_approval_resolved",
1861                "run_id": run_id,
1862                "approved": approved,
1863                "timestamp": chrono::Utc::now().to_rfc3339(),
1864            }));
1865
1866            (
1867                StatusCode::OK,
1868                Json(serde_json::json!({
1869                    "status": "ok",
1870                    "message": if approved { "Workflow approved" } else { "Workflow rejected" },
1871                    "run_id": run_id,
1872                    "approved": approved,
1873                })),
1874            )
1875                .into_response()
1876        }
1877        Err(e) => {
1878            tracing::warn!("approve_workflow_run: failed for run_id={run_id}: {e}");
1879            (
1880                StatusCode::BAD_GATEWAY,
1881                Json(serde_json::json!({
1882                    "error": format!("Failed to resume workflow: {e}")
1883                })),
1884            )
1885                .into_response()
1886        }
1887    }
1888}
1889
1890#[derive(Deserialize)]
1891pub struct ApproveWorkflowBody {
1892    pub approved: bool,
1893    pub feedback: Option<String>,
1894}
1895
1896/// POST /api/workflows/runs/{run_id}/retry
1897///
1898/// Body: { "cwd": string (optional) }
1899///
1900/// Retries a failed workflow run from the first failed step. Successful step
1901/// outputs are preserved so only the failed step + downstream steps re-execute.
1902pub async fn handle_retry_workflow_run(
1903    State(state): State<AppState>,
1904    headers: HeaderMap,
1905    Path(run_id): Path<String>,
1906    body: Option<Json<RetryWorkflowBody>>,
1907) -> impl IntoResponse {
1908    if let Err(e) = require_auth(&state, &headers) {
1909        return e.into_response();
1910    }
1911
1912    let cwd = body
1913        .and_then(|Json(b)| b.cwd)
1914        .unwrap_or_else(|| "/tmp".to_string());
1915
1916    let tool_name = format!(
1917        "{}__retry_workflow",
1918        crate::agent::operator::OPERATOR_SERVER_NAME
1919    );
1920    let mut tool_args = serde_json::Map::new();
1921    tool_args.insert(
1922        "run_id".to_string(),
1923        serde_json::Value::String(run_id.clone()),
1924    );
1925    tool_args.insert("cwd".to_string(), serde_json::Value::String(cwd));
1926
1927    let mcp_result = if let Some(ref registry) = state.mcp_registry {
1928        let mcp_future = registry.call_tool(&tool_name, serde_json::Value::Object(tool_args));
1929        match tokio::time::timeout(std::time::Duration::from_secs(30), mcp_future).await {
1930            Ok(Ok(result_str)) => Ok(result_str),
1931            Ok(Err(e)) => Err(format!("operator tool call failed: {e:#}")),
1932            Err(_) => Err("operator tool call timed out (30s)".to_string()),
1933        }
1934    } else {
1935        Err("MCP registry not available — operator not connected".to_string())
1936    };
1937
1938    match mcp_result {
1939        Ok(result_str) => {
1940            let _ = state.event_tx.send(serde_json::json!({
1941                "type": "workflow_retry",
1942                "run_id": run_id,
1943                "timestamp": chrono::Utc::now().to_rfc3339(),
1944            }));
1945            let payload = serde_json::from_str::<serde_json::Value>(&result_str)
1946                .unwrap_or_else(|_| serde_json::json!({"raw": result_str}));
1947            (StatusCode::OK, Json(payload)).into_response()
1948        }
1949        Err(e) => {
1950            tracing::warn!("retry_workflow_run: failed for run_id={run_id}: {e}");
1951            (
1952                StatusCode::BAD_GATEWAY,
1953                Json(serde_json::json!({ "error": format!("Failed to retry workflow: {e}") })),
1954            )
1955                .into_response()
1956        }
1957    }
1958}
1959
1960#[derive(Deserialize)]
1961pub struct RetryWorkflowBody {
1962    pub cwd: Option<String>,
1963}
1964
1965/// GET /api/workflows/agent-activity/{agent_id}
1966///
1967/// Reads the RunLog JSONL file for an agent and returns structured activity data.
1968/// Used by the Live Execution View for on-demand drill-down into agent tool calls,
1969/// messages, and results.
1970pub async fn handle_agent_activity(
1971    State(state): State<AppState>,
1972    headers: HeaderMap,
1973    Path(agent_id): Path<String>,
1974    Query(query): Query<AgentActivityQuery>,
1975) -> impl IntoResponse {
1976    if let Err(e) = require_auth(&state, &headers) {
1977        return e.into_response();
1978    }
1979
1980    let runlogs_dir =
1981        std::path::PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/tmp".into()))
1982            .join(".construct/operator_mcp/runlogs");
1983    let path = runlogs_dir.join(format!("{agent_id}.jsonl"));
1984
1985    if !path.exists() {
1986        return (
1987            StatusCode::NOT_FOUND,
1988            Json(serde_json::json!({ "error": "No run log found for this agent" })),
1989        )
1990            .into_response();
1991    }
1992
1993    let content = match std::fs::read_to_string(&path) {
1994        Ok(c) => c,
1995        Err(e) => {
1996            return (
1997                StatusCode::INTERNAL_SERVER_ERROR,
1998                Json(serde_json::json!({ "error": format!("Failed to read log: {e}") })),
1999            )
2000                .into_response();
2001        }
2002    };
2003
2004    let view = query.view.as_deref().unwrap_or("summary");
2005    let limit = query.limit.unwrap_or(100).min(500) as usize;
2006
2007    let entries: Vec<serde_json::Value> = content
2008        .lines()
2009        .filter_map(|line| serde_json::from_str(line).ok())
2010        .collect();
2011
2012    match view {
2013        "tool_calls" => {
2014            // Return tool calls with full args and results
2015            let tools: Vec<&serde_json::Value> = entries
2016                .iter()
2017                .filter(|e| e.get("kind").and_then(|v| v.as_str()) == Some("tool_call"))
2018                .collect();
2019            let total = tools.len();
2020            let slice: Vec<_> = tools.into_iter().rev().take(limit).collect();
2021            Json(serde_json::json!({
2022                "agent_id": agent_id,
2023                "view": "tool_calls",
2024                "total": total,
2025                "entries": slice,
2026            }))
2027            .into_response()
2028        }
2029        "messages" => {
2030            // Return assistant messages
2031            let msgs: Vec<&serde_json::Value> = entries
2032                .iter()
2033                .filter(|e| {
2034                    let kind = e.get("kind").and_then(|v| v.as_str()).unwrap_or("");
2035                    kind == "message" || kind == "user_message"
2036                })
2037                .collect();
2038            let total = msgs.len();
2039            let slice: Vec<_> = msgs.into_iter().rev().take(limit).collect();
2040            Json(serde_json::json!({
2041                "agent_id": agent_id,
2042                "view": "messages",
2043                "total": total,
2044                "entries": slice,
2045            }))
2046            .into_response()
2047        }
2048        "errors" => {
2049            let errs: Vec<&serde_json::Value> = entries
2050                .iter()
2051                .filter(|e| {
2052                    let kind = e.get("kind").and_then(|v| v.as_str()).unwrap_or("");
2053                    kind == "error"
2054                        || kind == "turn_failed"
2055                        || e.get("status").and_then(|v| v.as_str()) == Some("failed")
2056                })
2057                .collect();
2058            Json(serde_json::json!({
2059                "agent_id": agent_id,
2060                "view": "errors",
2061                "total": errs.len(),
2062                "entries": errs,
2063            }))
2064            .into_response()
2065        }
2066        "full" => {
2067            // Last N entries (most recent)
2068            let total = entries.len();
2069            let slice: Vec<_> = entries.into_iter().rev().take(limit).collect();
2070            Json(serde_json::json!({
2071                "agent_id": agent_id,
2072                "view": "full",
2073                "total": total,
2074                "entries": slice,
2075            }))
2076            .into_response()
2077        }
2078        _ => {
2079            // Summary view: header + stats + last message + recent tool calls
2080            let header = entries.first().cloned().unwrap_or_default();
2081            let tool_count = entries
2082                .iter()
2083                .filter(|e| e.get("kind").and_then(|v| v.as_str()) == Some("tool_call"))
2084                .count();
2085            let error_count = entries
2086                .iter()
2087                .filter(|e| {
2088                    let kind = e.get("kind").and_then(|v| v.as_str()).unwrap_or("");
2089                    kind == "error" || kind == "turn_failed"
2090                })
2091                .count();
2092            let last_message = entries
2093                .iter()
2094                .rev()
2095                .find(|e| e.get("kind").and_then(|v| v.as_str()) == Some("message"))
2096                .and_then(|e| e.get("text").and_then(|v| v.as_str()))
2097                .unwrap_or("");
2098            // Truncate to reasonable size for summary
2099            let last_msg_truncated = if last_message.len() > 5000 {
2100                &last_message[..5000]
2101            } else {
2102                last_message
2103            };
2104            // Recent tool calls (last 20)
2105            let recent_tools: Vec<_> = entries
2106                .iter()
2107                .filter(|e| e.get("kind").and_then(|v| v.as_str()) == Some("tool_call"))
2108                .rev()
2109                .take(20)
2110                .cloned()
2111                .collect();
2112            // Usage stats from turn_completed entries
2113            let mut input_tokens: u64 = 0;
2114            let mut output_tokens: u64 = 0;
2115            let mut total_cost: f64 = 0.0;
2116            for e in &entries {
2117                if e.get("kind").and_then(|v| v.as_str()) == Some("turn_completed") {
2118                    if let Some(usage) = e.get("usage") {
2119                        input_tokens += usage
2120                            .get("inputTokens")
2121                            .and_then(|v| v.as_u64())
2122                            .unwrap_or(0);
2123                        output_tokens += usage
2124                            .get("outputTokens")
2125                            .and_then(|v| v.as_u64())
2126                            .unwrap_or(0);
2127                        total_cost += usage
2128                            .get("totalCostUsd")
2129                            .and_then(|v| v.as_f64())
2130                            .unwrap_or(0.0);
2131                    }
2132                }
2133            }
2134            Json(serde_json::json!({
2135                "agent_id": agent_id,
2136                "view": "summary",
2137                "title": header.get("title").and_then(|v| v.as_str()).unwrap_or(""),
2138                "agent_type": header.get("agent_type").and_then(|v| v.as_str()).unwrap_or(""),
2139                "total_events": entries.len(),
2140                "tool_call_count": tool_count,
2141                "error_count": error_count,
2142                "last_message": last_msg_truncated,
2143                "recent_tools": recent_tools,
2144                "usage": {
2145                    "input_tokens": input_tokens,
2146                    "output_tokens": output_tokens,
2147                    "total_cost_usd": total_cost,
2148                },
2149            }))
2150            .into_response()
2151        }
2152    }
2153}
2154
2155#[derive(Deserialize)]
2156pub struct AgentActivityQuery {
2157    view: Option<String>,
2158    limit: Option<u32>,
2159}
2160
2161/// GET /api/workflows/dashboard
2162pub async fn handle_workflow_dashboard(
2163    State(state): State<AppState>,
2164    headers: HeaderMap,
2165) -> impl IntoResponse {
2166    if let Err(e) = require_auth(&state, &headers) {
2167        return e.into_response();
2168    }
2169
2170    let client = build_kumiho_client(&state);
2171    let space_path = workflow_space_path(&state);
2172    let runs_space = workflow_runs_space_path(&state);
2173
2174    // Fetch definitions from Kumiho + merge builtins
2175    let definitions = match client.list_items(&space_path, false).await {
2176        Ok(items) => merge_with_builtins(enrich_items(&client, items).await),
2177        Err(_) => merge_with_builtins(Vec::new()),
2178    };
2179    let definitions_count = definitions.len();
2180
2181    // Fetch recent runs from Kumiho
2182    let (recent_runs, total_runs) = match client.list_items(&runs_space, false).await {
2183        Ok(mut items) => {
2184            let total = items.len();
2185            items.sort_by(|a, b| {
2186                let a_time = a.created_at.as_deref().unwrap_or("");
2187                let b_time = b.created_at.as_deref().unwrap_or("");
2188                b_time.cmp(a_time)
2189            });
2190            items.truncate(5);
2191
2192            let krefs: Vec<String> = items.iter().map(|i| i.kref.clone()).collect();
2193            let rev_map = client
2194                .batch_get_revisions(&krefs, "latest")
2195                .await
2196                .unwrap_or_default();
2197
2198            let runs: Vec<WorkflowRunSummary> = items
2199                .iter()
2200                .map(|item| to_run_summary(item, rev_map.get(&item.kref)))
2201                .collect();
2202
2203            (runs, total)
2204        }
2205        Err(_) => (Vec::new(), 0),
2206    };
2207
2208    let active_runs = recent_runs
2209        .iter()
2210        .filter(|r| r.status == "running" || r.status == "paused")
2211        .count();
2212
2213    let dashboard = WorkflowDashboard {
2214        definitions_count,
2215        definitions,
2216        active_runs,
2217        recent_runs,
2218        total_runs,
2219    };
2220
2221    Json(serde_json::json!({ "dashboard": dashboard })).into_response()
2222}