1use super::AppState;
18use super::api::require_auth;
19use super::api_agents::build_kumiho_client;
20use super::kumiho_client::{ItemResponse, KumihoClient, KumihoError, RevisionResponse, slugify};
21use axum::{
22 extract::{Path, Query, State},
23 http::{HeaderMap, StatusCode},
24 response::{IntoResponse, Json},
25};
26use parking_lot::Mutex;
27use serde::{Deserialize, Serialize};
28use std::collections::HashMap;
29use std::sync::OnceLock;
30use std::time::Instant;
31
32const WORKFLOW_SPACE_NAME: &str = "Workflows";
33const WORKFLOW_RUNS_SPACE_NAME: &str = "WorkflowRuns";
34const WORKFLOW_RUN_REQUESTS_SPACE_NAME: &str = "WorkflowRunRequests";
35
36fn workflow_project(state: &AppState) -> String {
37 state.config.lock().kumiho.harness_project.clone()
38}
39
40fn workflow_space_path(state: &AppState) -> String {
41 format!("/{}/{}", workflow_project(state), WORKFLOW_SPACE_NAME)
42}
43
44fn workflow_runs_space_path(state: &AppState) -> String {
45 format!("/{}/{}", workflow_project(state), WORKFLOW_RUNS_SPACE_NAME)
46}
47
48fn workflow_run_requests_space_path(state: &AppState) -> String {
49 format!(
50 "/{}/{}",
51 workflow_project(state),
52 WORKFLOW_RUN_REQUESTS_SPACE_NAME
53 )
54}
55
56struct WorkflowCache {
59 workflows: Vec<WorkflowResponse>,
60 include_deprecated: bool,
61 fetched_at: Instant,
62}
63
64static WORKFLOW_CACHE: OnceLock<Mutex<Option<WorkflowCache>>> = OnceLock::new();
65const CACHE_TTL_SECS: u64 = 3;
66
67fn get_cached(include_deprecated: bool) -> Option<Vec<WorkflowResponse>> {
68 let lock = WORKFLOW_CACHE.get_or_init(|| Mutex::new(None));
69 let cache = lock.lock();
70 if let Some(ref c) = *cache {
71 if c.include_deprecated == include_deprecated
72 && c.fetched_at.elapsed().as_secs() < CACHE_TTL_SECS
73 {
74 return Some(c.workflows.clone());
75 }
76 }
77 None
78}
79
80fn set_cached(workflows: &[WorkflowResponse], include_deprecated: bool) {
81 let lock = WORKFLOW_CACHE.get_or_init(|| Mutex::new(None));
82 let mut cache = lock.lock();
83 *cache = Some(WorkflowCache {
84 workflows: workflows.to_vec(),
85 include_deprecated,
86 fetched_at: Instant::now(),
87 });
88}
89
90fn invalidate_cache() {
91 if let Some(lock) = WORKFLOW_CACHE.get() {
92 let mut cache = lock.lock();
93 if let Some(ref mut c) = *cache {
95 c.fetched_at = Instant::now() - std::time::Duration::from_secs(CACHE_TTL_SECS + 1);
96 }
97 }
98}
99
100#[derive(Deserialize)]
103pub struct WorkflowListQuery {
104 #[serde(default)]
105 pub include_deprecated: bool,
106 pub q: Option<String>,
107}
108
109#[derive(Deserialize)]
110pub struct CreateWorkflowBody {
111 pub name: String,
112 pub description: String,
113 pub definition: String,
114 #[serde(default)]
115 pub version: Option<String>,
116 #[serde(default)]
117 pub tags: Option<Vec<String>>,
118}
119
120#[derive(Deserialize)]
121pub struct DeprecateBody {
122 pub kref: String,
123 pub deprecated: bool,
124}
125
126#[derive(Deserialize)]
127pub struct WorkflowRunsQuery {
128 #[serde(default = "default_limit")]
129 pub limit: usize,
130 #[serde(default)]
131 pub workflow: Option<String>,
132}
133
134fn default_limit() -> usize {
135 20
136}
137
138#[derive(Deserialize)]
139pub struct RunWorkflowBody {
140 #[serde(default)]
141 pub inputs: serde_json::Value,
142 #[serde(default)]
143 pub cwd: Option<String>,
144}
145
146#[derive(Serialize, Clone)]
149pub struct WorkflowResponse {
150 pub kref: String,
151 pub name: String,
152 pub item_name: String,
153 pub deprecated: bool,
154 pub created_at: Option<String>,
155 pub description: String,
156 pub definition: String,
157 pub version: String,
158 pub tags: Vec<String>,
159 pub steps: usize,
160 pub revision_number: i32,
161 #[serde(default = "default_source")]
165 pub source: String,
166 #[serde(skip_serializing_if = "Vec::is_empty")]
167 pub triggers: Vec<WorkflowTrigger>,
168}
169
170fn default_source() -> String {
171 "custom".to_string()
172}
173
174#[derive(Serialize, Clone, Debug)]
175pub struct WorkflowTrigger {
176 pub on_kind: String,
177 pub on_tag: String,
178 #[serde(skip_serializing_if = "String::is_empty")]
179 pub on_name_pattern: String,
180}
181
182#[derive(Serialize, Clone)]
183pub struct WorkflowRunSummary {
184 pub kref: String,
185 pub run_id: String,
186 pub workflow_name: String,
187 pub status: String,
188 pub started_at: String,
189 pub completed_at: String,
190 pub steps_completed: String,
191 pub steps_total: String,
192 pub error: String,
193 #[serde(default, skip_serializing_if = "String::is_empty")]
196 pub workflow_item_kref: String,
197 #[serde(default, skip_serializing_if = "String::is_empty")]
201 pub workflow_revision_kref: String,
202}
203
204#[derive(Serialize, Clone)]
205pub struct TranscriptEntry {
206 pub speaker: String,
207 pub content: String,
208 pub round: u32,
209}
210
211#[derive(Serialize, Clone, Default)]
212pub struct ApprovalOutputData {
213 #[serde(skip_serializing_if = "Option::is_none")]
214 pub awaiting_approval: Option<bool>,
215 #[serde(skip_serializing_if = "Option::is_none")]
216 pub approval_message: Option<String>,
217 #[serde(skip_serializing_if = "Vec::is_empty")]
218 pub approve_keywords: Vec<String>,
219 #[serde(skip_serializing_if = "Vec::is_empty")]
220 pub reject_keywords: Vec<String>,
221}
222
223#[derive(Serialize, Clone)]
224pub struct WorkflowStepDetail {
225 pub step_id: String,
226 pub status: String,
227 #[serde(skip_serializing_if = "String::is_empty")]
228 pub agent_id: String,
229 #[serde(skip_serializing_if = "String::is_empty")]
230 pub agent_type: String,
231 #[serde(skip_serializing_if = "String::is_empty")]
232 pub role: String,
233 #[serde(skip_serializing_if = "String::is_empty")]
234 pub template_name: String,
235 #[serde(skip_serializing_if = "String::is_empty")]
236 pub output_preview: String,
237 #[serde(skip_serializing_if = "String::is_empty")]
238 pub artifact_path: String,
239 #[serde(skip_serializing_if = "Vec::is_empty")]
240 pub skills: Vec<String>,
241 #[serde(skip_serializing_if = "Vec::is_empty")]
242 pub transcript: Vec<TranscriptEntry>,
243 #[serde(skip_serializing_if = "Option::is_none")]
244 pub output_data: Option<ApprovalOutputData>,
245}
246
247#[derive(Serialize, Clone)]
248pub struct WorkflowRunDetail {
249 #[serde(flatten)]
250 pub summary: WorkflowRunSummary,
251 pub steps: Vec<WorkflowStepDetail>,
252}
253
254#[derive(Serialize)]
255pub struct WorkflowDashboard {
256 pub definitions_count: usize,
257 pub definitions: Vec<WorkflowResponse>,
258 pub active_runs: usize,
259 pub recent_runs: Vec<WorkflowRunSummary>,
260 pub total_runs: usize,
261}
262
263fn kumiho_err(e: KumihoError) -> (StatusCode, Json<serde_json::Value>) {
266 match &e {
267 KumihoError::Unreachable(_) => (
268 StatusCode::SERVICE_UNAVAILABLE,
269 Json(serde_json::json!({ "error": format!("Kumiho service unavailable: {e}") })),
270 ),
271 KumihoError::Api { status, body } => {
272 let code = if *status == 401 || *status == 403 {
273 StatusCode::BAD_GATEWAY
274 } else {
275 StatusCode::from_u16(*status).unwrap_or(StatusCode::BAD_GATEWAY)
276 };
277 (
278 code,
279 Json(serde_json::json!({ "error": format!("Kumiho upstream: {body}") })),
280 )
281 }
282 KumihoError::Decode(msg) => (
283 StatusCode::BAD_GATEWAY,
284 Json(serde_json::json!({ "error": format!("Bad response from Kumiho: {msg}") })),
285 ),
286 }
287}
288
289fn workflow_metadata(body: &CreateWorkflowBody) -> HashMap<String, String> {
290 let mut meta = HashMap::new();
291 meta.insert("display_name".to_string(), body.name.clone());
292 meta.insert("description".to_string(), body.description.clone());
293 meta.insert("definition".to_string(), body.definition.clone());
294 meta.insert("created_by".to_string(), "construct-dashboard".to_string());
295 let steps = count_yaml_steps(&body.definition);
297 meta.insert("steps".to_string(), steps.to_string());
298 if let Some(ref tags) = body.tags {
299 if !tags.is_empty() {
300 meta.insert("tags".to_string(), tags.join(","));
301 }
302 }
303 meta.insert(
305 "_search_text".to_string(),
306 format!("{} {}", body.name, body.description),
307 );
308 meta
309}
310
311fn count_yaml_steps(content: &str) -> usize {
312 let mut count = 0;
313 let mut in_steps = false;
314 for line in content.lines() {
315 let trimmed = line.trim();
316 if trimmed == "steps:" || trimmed == "tasks:" {
317 in_steps = true;
318 continue;
319 }
320 if in_steps {
321 if trimmed.starts_with("- id:") {
322 count += 1;
323 }
324 if !trimmed.is_empty()
325 && !trimmed.starts_with('-')
326 && !trimmed.starts_with(' ')
327 && !trimmed.starts_with('#')
328 && !line.starts_with(' ')
329 {
330 break;
331 }
332 }
333 }
334 count
335}
336
337fn to_workflow_response(item: &ItemResponse, rev: Option<&RevisionResponse>) -> WorkflowResponse {
338 let meta = rev.map(|r| &r.metadata);
339 let get = |key: &str| -> String { meta.and_then(|m| m.get(key)).cloned().unwrap_or_default() };
340 let tags_str = get("tags");
341 let tags: Vec<String> = if tags_str.is_empty() {
342 Vec::new()
343 } else {
344 tags_str.split(',').map(|s| s.trim().to_string()).collect()
345 };
346 let steps: usize = get("steps").parse().unwrap_or(0);
347
348 let display_name = {
349 let n = get("display_name");
350 if n.is_empty() {
351 item.item_name.clone()
352 } else {
353 n
354 }
355 };
356
357 let definition = get("definition");
358 let triggers = extract_triggers(&definition);
359
360 WorkflowResponse {
361 kref: item.kref.clone(),
362 name: display_name,
363 item_name: item.item_name.clone(),
364 deprecated: item.deprecated,
365 created_at: item.created_at.clone(),
366 description: get("description"),
367 definition,
368 version: format!("{}", rev.map(|r| r.number).unwrap_or(0)),
369 tags,
370 steps,
371 revision_number: rev.map(|r| r.number).unwrap_or(0),
372 source: "custom".to_string(),
373 triggers,
374 }
375}
376
377async fn prefer_artifact_definitions(
385 client: &super::kumiho_client::KumihoClient,
386 revs: &mut HashMap<String, RevisionResponse>,
387) {
388 for rev in revs.values_mut() {
389 if let Ok(artifact) = client
390 .get_artifact_by_name(&rev.kref, "workflow.yaml")
391 .await
392 {
393 let path = artifact
394 .location
395 .strip_prefix("file://")
396 .unwrap_or(&artifact.location);
397 if let Ok(yaml) = tokio::fs::read_to_string(path).await {
398 rev.metadata.insert("definition".to_string(), yaml);
399 }
400 }
401 }
402}
403
404async fn enrich_items(
405 client: &super::kumiho_client::KumihoClient,
406 items: Vec<ItemResponse>,
407) -> Vec<WorkflowResponse> {
408 let items: Vec<ItemResponse> = items.into_iter().filter(|i| i.kind == "workflow").collect();
411
412 if items.is_empty() {
413 return Vec::new();
414 }
415
416 let krefs: Vec<String> = items.iter().map(|i| i.kref.clone()).collect();
417
418 if let Ok(mut rev_map) = client.batch_get_revisions(&krefs, "published").await {
419 let missing: Vec<String> = krefs
420 .iter()
421 .filter(|k| !rev_map.contains_key(*k))
422 .cloned()
423 .collect();
424 let mut latest_map = if !missing.is_empty() {
425 client
426 .batch_get_revisions(&missing, "latest")
427 .await
428 .unwrap_or_default()
429 } else {
430 HashMap::new()
431 };
432
433 prefer_artifact_definitions(client, &mut rev_map).await;
439 prefer_artifact_definitions(client, &mut latest_map).await;
440
441 return items
442 .iter()
443 .map(|item| {
444 let rev = rev_map
445 .get(&item.kref)
446 .or_else(|| latest_map.get(&item.kref));
447 to_workflow_response(item, rev)
448 })
449 .collect();
450 }
451
452 let mut workflows = Vec::with_capacity(items.len());
454 for item in &items {
455 let rev = client.get_published_or_latest(&item.kref).await.ok();
456 workflows.push(to_workflow_response(item, rev.as_ref()));
457 }
458 workflows
459}
460
461fn to_run_summary(item: &ItemResponse, rev: Option<&RevisionResponse>) -> WorkflowRunSummary {
462 let meta = rev.map(|r| &r.metadata);
463 let get = |key: &str| -> String { meta.and_then(|m| m.get(key)).cloned().unwrap_or_default() };
464
465 let run_id_meta = get("run_id");
466 WorkflowRunSummary {
467 kref: item.kref.clone(),
468 run_id: if run_id_meta.is_empty() {
469 item.item_name.clone()
470 } else {
471 run_id_meta
472 },
473 workflow_name: {
474 let wn = get("workflow_name");
475 if wn.is_empty() { get("workflow") } else { wn }
476 },
477 status: get("status"),
478 started_at: get("started_at"),
479 completed_at: get("completed_at"),
480 steps_completed: get("steps_completed"),
481 steps_total: get("steps_total"),
482 error: get("error"),
483 workflow_item_kref: get("workflow_item_kref"),
484 workflow_revision_kref: get("workflow_revision_kref"),
485 }
486}
487
488fn extract_steps_from_metadata(meta: &HashMap<String, String>) -> Vec<WorkflowStepDetail> {
489 const SKIP_KEYS: &[&str] = &["step_count", "steps_completed", "steps_total"];
491
492 let mut steps = Vec::new();
493 for (key, value) in meta {
494 if SKIP_KEYS.contains(&key.as_str()) {
495 continue;
496 }
497 if let Some(step_id) = key.strip_prefix("step_") {
498 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(value) {
501 if !parsed.is_object() {
503 continue;
504 }
505 let skills = parsed
506 .get("skills")
507 .and_then(|v| v.as_array())
508 .map(|arr| {
509 arr.iter()
510 .filter_map(|s| s.as_str().map(|s| s.to_string()))
511 .collect::<Vec<_>>()
512 })
513 .unwrap_or_default();
514 let transcript = parsed
516 .get("transcript")
517 .and_then(|v| v.as_str())
518 .and_then(|s| serde_json::from_str::<Vec<serde_json::Value>>(s).ok())
519 .map(|arr| {
520 arr.iter()
521 .map(|entry| TranscriptEntry {
522 speaker: entry
523 .get("speaker")
524 .and_then(|v| v.as_str())
525 .unwrap_or("?")
526 .to_string(),
527 content: entry
528 .get("content")
529 .and_then(|v| v.as_str())
530 .unwrap_or("")
531 .to_string(),
532 round: entry.get("round").and_then(|v| v.as_u64()).unwrap_or(0)
533 as u32,
534 })
535 .collect::<Vec<_>>()
536 })
537 .unwrap_or_default();
538 let output_data = parsed.get("output_data").and_then(|v| {
540 let obj = if let Some(s) = v.as_str() {
542 serde_json::from_str::<serde_json::Value>(s).ok()
543 } else {
544 Some(v.clone())
545 };
546 obj.map(|o| ApprovalOutputData {
547 awaiting_approval: o.get("awaiting_approval").and_then(|v| v.as_bool()),
548 approval_message: o
549 .get("approval_message")
550 .and_then(|v| v.as_str())
551 .map(String::from),
552 approve_keywords: o
553 .get("approve_keywords")
554 .and_then(|v| v.as_array())
555 .map(|arr| {
556 arr.iter()
557 .filter_map(|s| s.as_str().map(String::from))
558 .collect()
559 })
560 .unwrap_or_default(),
561 reject_keywords: o
562 .get("reject_keywords")
563 .and_then(|v| v.as_array())
564 .map(|arr| {
565 arr.iter()
566 .filter_map(|s| s.as_str().map(String::from))
567 .collect()
568 })
569 .unwrap_or_default(),
570 })
571 });
572 steps.push(WorkflowStepDetail {
573 step_id: step_id.to_string(),
574 status: parsed
575 .get("status")
576 .and_then(|v| v.as_str())
577 .unwrap_or("unknown")
578 .to_string(),
579 agent_id: parsed
580 .get("agent_id")
581 .and_then(|v| v.as_str())
582 .unwrap_or("")
583 .to_string(),
584 agent_type: parsed
585 .get("agent_type")
586 .and_then(|v| v.as_str())
587 .unwrap_or("")
588 .to_string(),
589 role: parsed
590 .get("role")
591 .and_then(|v| v.as_str())
592 .unwrap_or("")
593 .to_string(),
594 template_name: parsed
595 .get("template_name")
596 .and_then(|v| v.as_str())
597 .unwrap_or("")
598 .to_string(),
599 output_preview: parsed
600 .get("output_preview")
601 .and_then(|v| v.as_str())
602 .unwrap_or("")
603 .to_string(),
604 artifact_path: parsed
605 .get("artifact_path")
606 .and_then(|v| v.as_str())
607 .unwrap_or("")
608 .to_string(),
609 skills,
610 transcript,
611 output_data,
612 });
613 } else if value.contains(r#""status""#) {
614 let status = if let Some(start) = value.find(r#""status": ""#) {
616 let rest = &value[start + 11..];
617 rest.split('"').next().unwrap_or("unknown")
618 } else {
619 "unknown"
620 };
621 steps.push(WorkflowStepDetail {
622 step_id: step_id.to_string(),
623 status: status.to_string(),
624 agent_id: String::new(),
625 agent_type: String::new(),
626 role: String::new(),
627 template_name: String::new(),
628 output_preview: String::new(),
629 artifact_path: String::new(),
630 skills: Vec::new(),
631 transcript: Vec::new(),
632 output_data: None,
633 });
634 }
635 }
636 }
637 steps
638}
639
640fn to_run_detail(item: &ItemResponse, rev: Option<&RevisionResponse>) -> WorkflowRunDetail {
641 let summary = to_run_summary(item, rev);
642 let steps = rev
643 .map(|r| extract_steps_from_metadata(&r.metadata))
644 .unwrap_or_default();
645 WorkflowRunDetail { summary, steps }
646}
647
648const BUILTIN_WORKFLOWS_DIR: &str = ".construct/operator_mcp/workflow/builtins";
652
653fn discover_builtin_workflows() -> Vec<WorkflowResponse> {
657 let home = directories::UserDirs::new()
658 .map(|u| u.home_dir().to_path_buf())
659 .unwrap_or_default();
660 let builtins_dir = home.join(BUILTIN_WORKFLOWS_DIR);
661 let Ok(entries) = std::fs::read_dir(&builtins_dir) else {
662 return Vec::new();
663 };
664
665 let mut workflows = Vec::new();
666 for entry in entries.flatten() {
667 let path = entry.path();
668 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
669 if ext != "yaml" && ext != "yml" {
670 continue;
671 }
672 let Ok(content) = std::fs::read_to_string(&path) else {
673 continue;
674 };
675 let name = extract_yaml_field(&content, "name").unwrap_or_else(|| {
677 path.file_stem()
678 .unwrap_or_default()
679 .to_string_lossy()
680 .into_owned()
681 });
682 let description = extract_yaml_field(&content, "description").unwrap_or_default();
683 let version = extract_yaml_field(&content, "version").unwrap_or_else(|| "1.0".into());
684 let tags_str = extract_yaml_field(&content, "tags").unwrap_or_default();
685 let tags: Vec<String> = if tags_str.is_empty() {
686 Vec::new()
687 } else {
688 tags_str
690 .trim_start_matches('[')
691 .trim_end_matches(']')
692 .split(',')
693 .map(|s| s.trim().trim_matches('"').trim_matches('\'').to_string())
694 .filter(|s| !s.is_empty())
695 .collect()
696 };
697 let steps = count_yaml_steps(&content);
698 let item_name = slugify(&name);
699
700 let triggers = extract_triggers(&content);
701 workflows.push(WorkflowResponse {
702 kref: format!("builtin://{item_name}"),
703 name,
704 item_name,
705 deprecated: false,
706 created_at: None,
707 description,
708 definition: content,
709 version,
710 tags,
711 steps,
712 revision_number: 0,
713 source: "builtin".to_string(),
714 triggers,
715 });
716 }
717 workflows
718}
719
720fn extract_yaml_field(content: &str, field: &str) -> Option<String> {
722 for line in content.lines() {
723 let trimmed = line.trim();
724 if let Some(rest) = trimmed.strip_prefix(field) {
725 if let Some(value) = rest.strip_prefix(':') {
726 let v = value.trim();
727 let v = v.trim_matches('"').trim_matches('\'');
729 if !v.is_empty() {
730 return Some(v.to_string());
731 }
732 }
733 }
734 if trimmed == "steps:" || trimmed == "inputs:" {
736 break;
737 }
738 }
739 None
740}
741
742fn extract_triggers(content: &str) -> Vec<WorkflowTrigger> {
747 let mut triggers = Vec::new();
748 let mut in_triggers = false;
749 let mut current_kind = String::new();
750 let mut current_tag = String::new();
751 let mut current_pattern = String::new();
752
753 for line in content.lines() {
754 let trimmed = line.trim();
755 if trimmed == "triggers:" {
756 in_triggers = true;
757 continue;
758 }
759 if !in_triggers {
760 continue;
761 }
762 if !trimmed.is_empty()
764 && !trimmed.starts_with('-')
765 && !trimmed.starts_with('#')
766 && !line.starts_with(' ')
767 && !line.starts_with('\t')
768 {
769 break;
770 }
771 if trimmed.starts_with("- ") {
773 if !current_kind.is_empty() {
774 triggers.push(WorkflowTrigger {
775 on_kind: std::mem::take(&mut current_kind),
776 on_tag: if current_tag.is_empty() {
777 "ready".to_string()
778 } else {
779 std::mem::take(&mut current_tag)
780 },
781 on_name_pattern: std::mem::take(&mut current_pattern),
782 });
783 }
784 let after_dash = trimmed.strip_prefix("- ").unwrap_or("");
786 if let Some((k, v)) = after_dash.split_once(':') {
787 let k = k.trim();
788 let v = v.trim().trim_matches('"').trim_matches('\'');
789 match k {
790 "on_kind" => current_kind = v.to_string(),
791 "on_tag" => current_tag = v.to_string(),
792 "on_name_pattern" => current_pattern = v.to_string(),
793 _ => {}
794 }
795 }
796 continue;
797 }
798 if let Some((k, v)) = trimmed.split_once(':') {
800 let k = k.trim();
801 let v = v.trim().trim_matches('"').trim_matches('\'');
802 match k {
803 "on_kind" => current_kind = v.to_string(),
804 "on_tag" => current_tag = v.to_string(),
805 "on_name_pattern" => current_pattern = v.to_string(),
806 _ => {}
807 }
808 }
809 }
810 if !current_kind.is_empty() {
812 triggers.push(WorkflowTrigger {
813 on_kind: current_kind,
814 on_tag: if current_tag.is_empty() {
815 "ready".to_string()
816 } else {
817 current_tag
818 },
819 on_name_pattern: current_pattern,
820 });
821 }
822 triggers
823}
824
825fn extract_cron_triggers(content: &str) -> Vec<(String, Option<String>)> {
830 let mut results = Vec::new();
831 let mut in_triggers = false;
832 let mut current_cron = String::new();
833 let mut current_tz: Option<String> = None;
834
835 for line in content.lines() {
836 let trimmed = line.trim();
837 if trimmed == "triggers:" {
838 in_triggers = true;
839 continue;
840 }
841 if !in_triggers {
842 continue;
843 }
844 if !trimmed.is_empty()
846 && !trimmed.starts_with('-')
847 && !trimmed.starts_with('#')
848 && !line.starts_with(' ')
849 && !line.starts_with('\t')
850 {
851 break;
852 }
853 if trimmed.starts_with("- ") {
855 if !current_cron.is_empty() {
856 results.push((std::mem::take(&mut current_cron), current_tz.take()));
857 }
858 let after_dash = trimmed.strip_prefix("- ").unwrap_or("");
860 if let Some((k, v)) = after_dash.split_once(':') {
861 let k = k.trim();
862 let v = v.trim().trim_matches('"').trim_matches('\'');
863 match k {
864 "cron" if !v.is_empty() => current_cron = v.to_string(),
865 "timezone" | "tz" if !v.is_empty() => current_tz = Some(v.to_string()),
866 _ => {}
867 }
868 }
869 continue;
870 }
871 if let Some((k, v)) = trimmed.split_once(':') {
873 let k = k.trim();
874 let v = v.trim().trim_matches('"').trim_matches('\'');
875 match k {
876 "cron" if !v.is_empty() => current_cron = v.to_string(),
877 "timezone" | "tz" if !v.is_empty() => current_tz = Some(v.to_string()),
878 _ => {}
879 }
880 }
881 }
882 if !current_cron.is_empty() {
884 results.push((current_cron, current_tz));
885 }
886 results
887}
888
889async fn persist_workflow_artifact(
895 client: &KumihoClient,
896 revision_kref: &str,
897 revision_number: i32,
898 workflow_name: &str,
899 definition: &str,
900) {
901 let home = directories::UserDirs::new()
902 .map(|u| u.home_dir().to_path_buf())
903 .unwrap_or_default();
904 let dir = home.join(".construct/workflows");
905 let _ = tokio::fs::create_dir_all(&dir).await;
906
907 let slug = slugify(workflow_name);
908 let file_path = dir.join(format!("{slug}.r{revision_number}.yaml"));
909 let location = format!("file://{}", file_path.display());
910
911 if let Err(e) = tokio::fs::write(&file_path, definition).await {
912 tracing::warn!("Failed to write workflow YAML for {workflow_name}: {e}");
913 return;
914 }
915
916 if let Err(e) = client
917 .create_artifact(revision_kref, "workflow.yaml", &location, HashMap::new())
918 .await
919 {
920 tracing::warn!("Failed to create artifact for workflow {workflow_name}: {e}");
921 } else {
922 tracing::info!("Persisted workflow artifact: {location}");
923 }
924}
925
926fn sync_cron_for_workflow(state: &AppState, workflow_name: &str, definition: &str) {
927 let cron_triggers = extract_cron_triggers(definition);
928 let config = state.config.lock();
929
930 if let Err(e) = crate::cron::remove_workflow_cron_jobs(&config, workflow_name) {
932 tracing::warn!("Failed to remove old cron jobs for workflow {workflow_name}: {e}");
933 }
934
935 if cron_triggers.is_empty() {
936 return;
937 }
938
939 let wf_crons: Vec<(String, String, Option<String>)> = cron_triggers
940 .into_iter()
941 .map(|(expr, tz)| (workflow_name.to_string(), expr, tz))
942 .collect();
943
944 if let Err(e) = crate::cron::sync_workflow_cron_jobs(&config, &wf_crons) {
945 tracing::warn!("Failed to sync cron triggers for workflow {workflow_name}: {e}");
946 }
947}
948
949fn merge_with_builtins(mut kumiho_workflows: Vec<WorkflowResponse>) -> Vec<WorkflowResponse> {
955 let builtins = discover_builtin_workflows();
956 if builtins.is_empty() {
957 return kumiho_workflows;
958 }
959
960 let builtin_names: std::collections::HashSet<String> =
961 builtins.iter().map(|b| b.item_name.clone()).collect();
962
963 for wf in &mut kumiho_workflows {
965 if builtin_names.contains(&wf.item_name) {
966 wf.source = "builtin-modified".to_string();
967 }
968 }
969
970 let kumiho_names: std::collections::HashSet<String> = kumiho_workflows
972 .iter()
973 .map(|w| w.item_name.clone())
974 .collect();
975 for builtin in builtins {
976 if !kumiho_names.contains(&builtin.item_name) {
977 kumiho_workflows.push(builtin);
978 }
979 }
980
981 kumiho_workflows
982}
983
984pub async fn handle_list_workflows(
988 State(state): State<AppState>,
989 headers: HeaderMap,
990 Query(query): Query<WorkflowListQuery>,
991) -> impl IntoResponse {
992 if let Err(e) = require_auth(&state, &headers) {
993 return e.into_response();
994 }
995
996 let client = build_kumiho_client(&state);
997 let project = workflow_project(&state);
998 let space_path = workflow_space_path(&state);
999
1000 if query.q.is_none() {
1002 if let Some(cached) = get_cached(query.include_deprecated) {
1003 return Json(serde_json::json!({ "workflows": cached })).into_response();
1004 }
1005 }
1006
1007 let items_result = if let Some(ref q) = query.q {
1008 client
1009 .search_items(q, &project, "workflow", query.include_deprecated)
1010 .await
1011 .map(|results| results.into_iter().map(|sr| sr.item).collect::<Vec<_>>())
1012 } else {
1013 client
1014 .list_items(&space_path, query.include_deprecated)
1015 .await
1016 };
1017
1018 match items_result {
1019 Ok(items) => {
1020 let workflows = merge_with_builtins(enrich_items(&client, items).await);
1021 if query.q.is_none() {
1022 set_cached(&workflows, query.include_deprecated);
1023 }
1024 Json(serde_json::json!({ "workflows": workflows })).into_response()
1025 }
1026 Err(ref e) if matches!(e, KumihoError::Api { status: 404, .. }) => {
1027 let _ = client.ensure_project(&project).await;
1028 let _ = client.ensure_space(&project, WORKFLOW_SPACE_NAME).await;
1029 let workflows = merge_with_builtins(Vec::new());
1030 Json(serde_json::json!({ "workflows": workflows })).into_response()
1031 }
1032 Err(e) => {
1033 if query.q.is_none() {
1035 let lock = WORKFLOW_CACHE.get_or_init(|| Mutex::new(None));
1036 let cache = lock.lock();
1037 if let Some(ref c) = *cache {
1038 tracing::warn!("Workflows list failed, returning stale cache: {e}");
1039 return Json(serde_json::json!({ "workflows": c.workflows })).into_response();
1040 }
1041 }
1042 kumiho_err(e).into_response()
1043 }
1044 }
1045}
1046
1047#[derive(Debug)]
1053struct ValidationOutcome {
1054 valid: bool,
1055 errors: Vec<serde_json::Value>,
1056 warnings: Vec<serde_json::Value>,
1057}
1058
1059async fn validate_via_operator(
1063 state: &AppState,
1064 args: serde_json::Map<String, serde_json::Value>,
1065) -> Result<ValidationOutcome, String> {
1066 let tool_name = format!(
1067 "{}__validate_workflow",
1068 crate::agent::operator::OPERATOR_SERVER_NAME
1069 );
1070
1071 let registry = state
1072 .mcp_registry
1073 .as_ref()
1074 .ok_or_else(|| "MCP registry not available — operator not connected".to_string())?;
1075
1076 let fut = registry.call_tool(&tool_name, serde_json::Value::Object(args));
1077 let result_str = match tokio::time::timeout(std::time::Duration::from_secs(15), fut).await {
1078 Ok(Ok(s)) => s,
1079 Ok(Err(e)) => return Err(format!("operator validate_workflow failed: {e:#}")),
1080 Err(_) => return Err("operator validate_workflow timed out (15s)".to_string()),
1081 };
1082
1083 let outer: serde_json::Value = serde_json::from_str(&result_str)
1085 .map_err(|e| format!("validate_workflow: outer JSON parse failed: {e}"))?;
1086
1087 let inner_text = outer
1088 .get("content")
1089 .and_then(|c| c.get(0))
1090 .and_then(|c0| c0.get("text"))
1091 .and_then(|t| t.as_str())
1092 .ok_or_else(|| "validate_workflow: missing content[0].text".to_string())?;
1093
1094 let inner: serde_json::Value = serde_json::from_str(inner_text)
1095 .map_err(|e| format!("validate_workflow: inner JSON parse failed: {e}"))?;
1096
1097 let valid = inner
1098 .get("valid")
1099 .and_then(|v| v.as_bool())
1100 .ok_or_else(|| "validate_workflow: missing `valid` field".to_string())?;
1101 let errors = inner
1102 .get("errors")
1103 .and_then(|v| v.as_array())
1104 .cloned()
1105 .unwrap_or_default();
1106 let warnings = inner
1107 .get("warnings")
1108 .and_then(|v| v.as_array())
1109 .cloned()
1110 .unwrap_or_default();
1111
1112 Ok(ValidationOutcome {
1113 valid,
1114 errors,
1115 warnings,
1116 })
1117}
1118
1119fn validation_error_response(
1121 outcome: &ValidationOutcome,
1122 context: &str,
1123) -> (StatusCode, Json<serde_json::Value>) {
1124 (
1125 StatusCode::BAD_REQUEST,
1126 Json(serde_json::json!({
1127 "error": format!("Workflow validation failed: {context}"),
1128 "valid": false,
1129 "errors": outcome.errors,
1130 "warnings": outcome.warnings,
1131 })),
1132 )
1133}
1134
1135pub async fn handle_create_workflow(
1137 State(state): State<AppState>,
1138 headers: HeaderMap,
1139 Json(body): Json<CreateWorkflowBody>,
1140) -> impl IntoResponse {
1141 if let Err(e) = require_auth(&state, &headers) {
1142 return e.into_response();
1143 }
1144
1145 let mut v_args = serde_json::Map::new();
1148 v_args.insert(
1149 "workflow_yaml".to_string(),
1150 serde_json::Value::String(body.definition.clone()),
1151 );
1152 match validate_via_operator(&state, v_args).await {
1153 Ok(outcome) if !outcome.valid => {
1154 return validation_error_response(&outcome, "cannot save invalid workflow")
1155 .into_response();
1156 }
1157 Ok(_) => {}
1158 Err(e) => {
1159 tracing::warn!("create_workflow: validation skipped (infra error): {e}");
1160 }
1161 }
1162
1163 let client = build_kumiho_client(&state);
1164 let project = workflow_project(&state);
1165 let space_path = workflow_space_path(&state);
1166
1167 if let Err(e) = client.ensure_project(&project).await {
1168 return kumiho_err(e).into_response();
1169 }
1170 if let Err(e) = client.ensure_space(&project, WORKFLOW_SPACE_NAME).await {
1171 return kumiho_err(e).into_response();
1172 }
1173
1174 let slug = slugify(&body.name);
1175 let item = match client
1176 .create_item(&space_path, &slug, "workflow", HashMap::new())
1177 .await
1178 {
1179 Ok(item) => item,
1180 Err(e) => return kumiho_err(e).into_response(),
1181 };
1182
1183 let metadata = workflow_metadata(&body);
1184 let rev = match client.create_revision(&item.kref, metadata).await {
1185 Ok(rev) => rev,
1186 Err(e) => return kumiho_err(e).into_response(),
1187 };
1188
1189 persist_workflow_artifact(&client, &rev.kref, rev.number, &body.name, &body.definition).await;
1191 let _ = client.tag_revision(&rev.kref, "published").await;
1192
1193 invalidate_cache();
1194 sync_cron_for_workflow(&state, &body.name, &body.definition);
1195
1196 let workflow = to_workflow_response(&item, Some(&rev));
1197 (
1198 StatusCode::CREATED,
1199 Json(serde_json::json!({ "workflow": workflow })),
1200 )
1201 .into_response()
1202}
1203
1204pub async fn handle_update_workflow(
1206 State(state): State<AppState>,
1207 headers: HeaderMap,
1208 Path(kref): Path<String>,
1209 Json(body): Json<CreateWorkflowBody>,
1210) -> impl IntoResponse {
1211 if let Err(e) = require_auth(&state, &headers) {
1212 return e.into_response();
1213 }
1214
1215 let mut v_args = serde_json::Map::new();
1217 v_args.insert(
1218 "workflow_yaml".to_string(),
1219 serde_json::Value::String(body.definition.clone()),
1220 );
1221 match validate_via_operator(&state, v_args).await {
1222 Ok(outcome) if !outcome.valid => {
1223 return validation_error_response(&outcome, "cannot save invalid workflow")
1224 .into_response();
1225 }
1226 Ok(_) => {}
1227 Err(e) => {
1228 tracing::warn!("update_workflow: validation skipped (infra error): {e}");
1229 }
1230 }
1231
1232 let kref = format!("kref://{kref}");
1233 let client = build_kumiho_client(&state);
1234
1235 let metadata = workflow_metadata(&body);
1236 let rev = match client.create_revision(&kref, metadata).await {
1237 Ok(rev) => rev,
1238 Err(e) => return kumiho_err(e).into_response(),
1239 };
1240
1241 persist_workflow_artifact(&client, &rev.kref, rev.number, &body.name, &body.definition).await;
1243 let _ = client.tag_revision(&rev.kref, "published").await;
1244
1245 let items = match client.list_items(&workflow_space_path(&state), true).await {
1246 Ok(items) => items,
1247 Err(e) => return kumiho_err(e).into_response(),
1248 };
1249
1250 invalidate_cache();
1251 sync_cron_for_workflow(&state, &body.name, &body.definition);
1252
1253 let item = items.iter().find(|i| i.kref == kref);
1254 match item {
1255 Some(item) => {
1256 let workflow = to_workflow_response(item, Some(&rev));
1257 Json(serde_json::json!({ "workflow": workflow })).into_response()
1258 }
1259 None => {
1260 let fallback = ItemResponse {
1261 kref: kref.clone(),
1262 name: body.name.clone(),
1263 item_name: body.name.clone(),
1264 kind: "workflow".to_string(),
1265 deprecated: false,
1266 created_at: None,
1267 metadata: HashMap::new(),
1268 };
1269 let workflow = to_workflow_response(&fallback, Some(&rev));
1270 Json(serde_json::json!({ "workflow": workflow })).into_response()
1271 }
1272 }
1273}
1274
1275pub async fn handle_deprecate_workflow(
1277 State(state): State<AppState>,
1278 headers: HeaderMap,
1279 Json(body): Json<DeprecateBody>,
1280) -> impl IntoResponse {
1281 if let Err(e) = require_auth(&state, &headers) {
1282 return e.into_response();
1283 }
1284
1285 let kref = body.kref.clone();
1286 let client = build_kumiho_client(&state);
1287
1288 match client.deprecate_item(&kref, body.deprecated).await {
1289 Ok(item) => {
1290 invalidate_cache();
1291 let rev = client.get_published_or_latest(&kref).await.ok();
1292
1293 if body.deprecated {
1295 if let Some(item_segment) = kref.split('/').last() {
1297 let workflow_name = item_segment
1298 .rsplit_once('.')
1299 .map(|(name, _kind)| name)
1300 .unwrap_or(item_segment);
1301 let config = state.config.lock();
1302 if let Err(e) = crate::cron::remove_workflow_cron_jobs(&config, workflow_name) {
1303 tracing::warn!("Failed to remove cron jobs for deprecated workflow: {e}");
1304 }
1305 }
1306 } else if let Some(ref rev) = rev {
1307 if let Some(definition) = rev.metadata.get("definition") {
1309 sync_cron_for_workflow(&state, &item.item_name, definition);
1310 }
1311 }
1312
1313 let workflow = to_workflow_response(&item, rev.as_ref());
1314 Json(serde_json::json!({ "workflow": workflow })).into_response()
1315 }
1316 Err(e) => kumiho_err(e).into_response(),
1317 }
1318}
1319
1320pub async fn handle_delete_workflow(
1322 State(state): State<AppState>,
1323 headers: HeaderMap,
1324 Path(kref): Path<String>,
1325) -> impl IntoResponse {
1326 if let Err(e) = require_auth(&state, &headers) {
1327 return e.into_response();
1328 }
1329
1330 let kref = format!("kref://{kref}");
1331 let client = build_kumiho_client(&state);
1332
1333 match client.delete_item(&kref).await {
1334 Ok(()) => {
1335 invalidate_cache();
1336
1337 if let Some(item_segment) = kref.split('/').last() {
1341 let workflow_name = item_segment
1342 .rsplit_once('.')
1343 .map(|(name, _kind)| name)
1344 .unwrap_or(item_segment);
1345 let config = state.config.lock();
1346 if let Err(e) = crate::cron::remove_workflow_cron_jobs(&config, workflow_name) {
1347 tracing::warn!("Failed to remove cron jobs for deleted workflow: {e}");
1348 }
1349 }
1350
1351 StatusCode::NO_CONTENT.into_response()
1352 }
1353 Err(e) => kumiho_err(e).into_response(),
1354 }
1355}
1356
1357pub async fn handle_run_workflow(
1362 State(state): State<AppState>,
1363 headers: HeaderMap,
1364 Path(name): Path<String>,
1365 body: Option<Json<RunWorkflowBody>>,
1366) -> impl IntoResponse {
1367 if let Err(e) = require_auth(&state, &headers) {
1368 return e.into_response();
1369 }
1370
1371 let inputs = body
1372 .as_ref()
1373 .map(|b| b.inputs.clone())
1374 .unwrap_or(serde_json::Value::Object(Default::default()));
1375 let cwd = body.as_ref().and_then(|b| b.cwd.clone());
1376
1377 let mut v_args = serde_json::Map::new();
1381 v_args.insert(
1382 "workflow".to_string(),
1383 serde_json::Value::String(name.clone()),
1384 );
1385 if let Some(ref c) = cwd {
1386 v_args.insert("cwd".to_string(), serde_json::Value::String(c.clone()));
1387 }
1388 match validate_via_operator(&state, v_args).await {
1389 Ok(outcome) if !outcome.valid => {
1390 return validation_error_response(&outcome, "cannot dispatch invalid workflow")
1391 .into_response();
1392 }
1393 Ok(_) => {}
1394 Err(e) => {
1395 tracing::warn!("run_workflow: validation skipped (infra error): {e}");
1396 }
1397 }
1398
1399 let run_id = uuid::Uuid::new_v4().to_string();
1400 let now = chrono::Utc::now().to_rfc3339();
1401
1402 let client = build_kumiho_client(&state);
1403 let project = workflow_project(&state);
1404 let requests_space_path = workflow_run_requests_space_path(&state);
1405
1406 let _ = client.ensure_project(&project).await;
1408 let _ = client
1409 .ensure_space(&project, WORKFLOW_RUN_REQUESTS_SPACE_NAME)
1410 .await;
1411
1412 let mut metadata = HashMap::new();
1413 metadata.insert("workflow_name".to_string(), name.clone());
1414 metadata.insert("run_id".to_string(), run_id.clone());
1415 metadata.insert("inputs".to_string(), inputs.to_string());
1416 metadata.insert("cwd".to_string(), cwd.unwrap_or_default());
1417 metadata.insert("trigger_source".to_string(), "api".to_string());
1418 metadata.insert("requested_at".to_string(), now);
1419
1420 let item_name = format!("run-{}", &run_id[..run_id.len().min(12)]);
1421
1422 match client
1423 .create_item(
1424 &requests_space_path,
1425 &item_name,
1426 "workflow-run-request",
1427 metadata.clone(),
1428 )
1429 .await
1430 {
1431 Ok(item) => {
1432 if let Ok(rev) = client.create_revision(&item.kref, metadata).await {
1433 let _ = client.tag_revision(&rev.kref, "pending").await;
1434 }
1435 (
1436 StatusCode::OK,
1437 Json(serde_json::json!({
1438 "run_id": run_id,
1439 "workflow": name,
1440 "status": "pending",
1441 })),
1442 )
1443 .into_response()
1444 }
1445 Err(e) => {
1446 tracing::warn!("Failed to create workflow run request: {e}");
1447 (
1448 StatusCode::INTERNAL_SERVER_ERROR,
1449 Json(serde_json::json!({
1450 "error": format!("Failed to create run request: {e}")
1451 })),
1452 )
1453 .into_response()
1454 }
1455 }
1456}
1457
1458pub async fn handle_get_workflow_by_revision(
1465 State(state): State<AppState>,
1466 headers: HeaderMap,
1467 Path(kref): Path<String>,
1468) -> impl IntoResponse {
1469 if let Err(e) = require_auth(&state, &headers) {
1470 return e.into_response();
1471 }
1472
1473 let revision_kref = if kref.starts_with("kref://") {
1474 kref.clone()
1475 } else {
1476 format!("kref://{kref}")
1477 };
1478
1479 let client = build_kumiho_client(&state);
1480
1481 let mut rev = match client.get_revision(&revision_kref).await {
1482 Ok(r) => r,
1483 Err(e) => return kumiho_err(e).into_response(),
1484 };
1485
1486 if let Ok(artifact) = client
1492 .get_artifact_by_name(&rev.kref, "workflow.yaml")
1493 .await
1494 {
1495 let path = artifact
1496 .location
1497 .strip_prefix("file://")
1498 .unwrap_or(&artifact.location);
1499 if let Ok(yaml) = tokio::fs::read_to_string(path).await {
1500 rev.metadata.insert("definition".to_string(), yaml);
1501 }
1502 }
1503
1504 let item_name = rev
1507 .item_kref
1508 .rsplit('/')
1509 .next()
1510 .map(|seg| {
1511 seg.rsplit_once('.')
1512 .map(|(n, _)| n)
1513 .unwrap_or(seg)
1514 .to_string()
1515 })
1516 .unwrap_or_default();
1517
1518 let item = ItemResponse {
1519 kref: rev.item_kref.clone(),
1520 name: item_name.clone(),
1521 item_name,
1522 kind: "workflow".to_string(),
1523 deprecated: false,
1524 created_at: rev.created_at.clone(),
1525 metadata: HashMap::new(),
1526 };
1527
1528 let workflow = to_workflow_response(&item, Some(&rev));
1529 Json(serde_json::json!({ "workflow": workflow })).into_response()
1530}
1531
1532pub async fn handle_list_workflow_runs(
1536 State(state): State<AppState>,
1537 headers: HeaderMap,
1538 Query(query): Query<WorkflowRunsQuery>,
1539) -> impl IntoResponse {
1540 if let Err(e) = require_auth(&state, &headers) {
1541 return e.into_response();
1542 }
1543
1544 let client = build_kumiho_client(&state);
1545 let project = workflow_project(&state);
1546 let runs_space = workflow_runs_space_path(&state);
1547
1548 match client.list_items(&runs_space, false).await {
1549 Ok(mut items) => {
1550 items.retain(|i| i.kind == "workflow_run");
1552
1553 if let Some(ref wf_name) = query.workflow {
1554 items.retain(|item| {
1555 item.metadata
1556 .get("workflow_name")
1557 .or_else(|| item.metadata.get("workflow"))
1558 .map(|n| n == wf_name)
1559 .unwrap_or(false)
1560 });
1561 }
1562
1563 items.sort_by(|a, b| {
1564 let a_time = a.created_at.as_deref().unwrap_or("");
1565 let b_time = b.created_at.as_deref().unwrap_or("");
1566 b_time.cmp(a_time)
1567 });
1568 items.truncate(query.limit);
1569
1570 let krefs: Vec<String> = items.iter().map(|i| i.kref.clone()).collect();
1571 let rev_map = client
1572 .batch_get_revisions(&krefs, "latest")
1573 .await
1574 .unwrap_or_default();
1575
1576 let runs: Vec<WorkflowRunSummary> = items
1577 .iter()
1578 .map(|item| to_run_summary(item, rev_map.get(&item.kref)))
1579 .collect();
1580
1581 Json(serde_json::json!({ "runs": runs, "count": runs.len() })).into_response()
1582 }
1583 Err(ref e) if matches!(e, KumihoError::Api { status: 404, .. }) => {
1584 let _ = client.ensure_project(&project).await;
1585 let _ = client
1586 .ensure_space(&project, WORKFLOW_RUNS_SPACE_NAME)
1587 .await;
1588 Json(serde_json::json!({ "runs": [], "count": 0 })).into_response()
1589 }
1590 Err(e) => {
1591 let msg = format!("Failed to fetch workflow runs: {e}");
1592 (
1593 StatusCode::SERVICE_UNAVAILABLE,
1594 Json(serde_json::json!({ "error": msg })),
1595 )
1596 .into_response()
1597 }
1598 }
1599}
1600
1601pub async fn handle_get_workflow_run(
1603 State(state): State<AppState>,
1604 headers: HeaderMap,
1605 Path(run_id): Path<String>,
1606) -> impl IntoResponse {
1607 if let Err(e) = require_auth(&state, &headers) {
1608 return e.into_response();
1609 }
1610
1611 let client = build_kumiho_client(&state);
1612 let project = workflow_project(&state);
1613 let runs_space = workflow_runs_space_path(&state);
1614
1615 let run_id_prefix = &run_id[..run_id.len().min(12)];
1618
1619 if let Ok(items) = client
1623 .list_items_filtered(&runs_space, run_id_prefix, false)
1624 .await
1625 {
1626 let run_id_lower = run_id.to_lowercase();
1628 let prefix_lower = run_id_lower[..run_id_lower.len().min(12)].to_string();
1629 if let Some(item) = items.iter().find(|i| {
1630 i.kind == "workflow_run" && i.item_name.to_lowercase().contains(&prefix_lower)
1631 }) {
1632 let rev = client.get_latest_revision(&item.kref).await.ok();
1633 let detail = to_run_detail(item, rev.as_ref());
1634 return Json(serde_json::json!({ "run": detail })).into_response();
1635 }
1636 }
1637
1638 if let Ok(results) = client
1641 .search_items(&run_id, &project, "workflow_run", false)
1642 .await
1643 {
1644 if let Some(sr) = results.first() {
1645 let rev = client.get_latest_revision(&sr.item.kref).await.ok();
1646 let detail = to_run_detail(&sr.item, rev.as_ref());
1647 return Json(serde_json::json!({ "run": detail })).into_response();
1648 }
1649 }
1650
1651 match client.list_items(&runs_space, false).await {
1653 Ok(items) => {
1654 let run_id_lower = run_id.to_lowercase();
1655 let found = items.iter().find(|item| {
1656 if item.kind != "workflow_run" {
1657 return false;
1658 }
1659 if let Some(meta_run_id) = item.metadata.get("run_id") {
1661 if meta_run_id == &run_id {
1662 return true;
1663 }
1664 }
1665 let prefix = &run_id_lower[..run_id_lower.len().min(12)];
1667 item.item_name.to_lowercase().contains(prefix)
1668 });
1669
1670 match found {
1671 Some(item) => {
1672 let rev = client.get_latest_revision(&item.kref).await.ok();
1673 let detail = to_run_detail(item, rev.as_ref());
1674 Json(serde_json::json!({ "run": detail })).into_response()
1675 }
1676 None => (
1677 StatusCode::NOT_FOUND,
1678 Json(serde_json::json!({ "error": format!("Run '{run_id}' not found") })),
1679 )
1680 .into_response(),
1681 }
1682 }
1683 Err(e) => {
1684 let msg = format!("Kumiho error looking up run '{run_id}': {e}");
1685 tracing::warn!("{msg}");
1686 (
1687 StatusCode::SERVICE_UNAVAILABLE,
1688 Json(serde_json::json!({ "error": msg })),
1689 )
1690 .into_response()
1691 }
1692 }
1693}
1694
1695pub async fn handle_delete_workflow_run(
1701 State(state): State<AppState>,
1702 headers: HeaderMap,
1703 Path(run_id): Path<String>,
1704) -> impl IntoResponse {
1705 if let Err(e) = require_auth(&state, &headers) {
1706 return e.into_response();
1707 }
1708
1709 let client = build_kumiho_client(&state);
1710 let runs_space = workflow_runs_space_path(&state);
1711
1712 let run_id_prefix = &run_id[..run_id.len().min(12)];
1714
1715 let kref = if let Ok(items) = client
1716 .list_items_filtered(&runs_space, run_id_prefix, false)
1717 .await
1718 {
1719 let run_id_lower = run_id.to_lowercase();
1720 let prefix_lower = run_id_lower[..run_id_lower.len().min(12)].to_string();
1721 items
1722 .iter()
1723 .find(|i| {
1724 i.kind == "workflow_run" && i.item_name.to_lowercase().contains(&prefix_lower)
1725 })
1726 .map(|i| i.kref.clone())
1727 } else {
1728 None
1729 };
1730
1731 let kref = match kref {
1732 Some(k) => k,
1733 None => {
1734 return (
1735 StatusCode::NOT_FOUND,
1736 Json(serde_json::json!({ "error": format!("Run '{run_id}' not found") })),
1737 )
1738 .into_response();
1739 }
1740 };
1741
1742 match client.delete_item(&kref).await {
1743 Ok(()) => {
1744 cleanup_local_run_files(&run_id).await;
1745 StatusCode::NO_CONTENT.into_response()
1746 }
1747 Err(e) => {
1748 let msg = format!("Failed to delete run '{run_id}': {e}");
1749 tracing::warn!("{msg}");
1750 kumiho_err(e).into_response()
1751 }
1752 }
1753}
1754
1755async fn cleanup_local_run_files(run_id: &str) {
1762 let Some(user_dirs) = directories::UserDirs::new() else {
1763 return;
1764 };
1765 let home = user_dirs.home_dir().to_path_buf();
1766
1767 let checkpoint = home.join(format!(".construct/workflow_checkpoints/{run_id}.json"));
1768 if let Err(e) = tokio::fs::remove_file(&checkpoint).await {
1769 if e.kind() != std::io::ErrorKind::NotFound {
1770 tracing::warn!("Failed to remove checkpoint {}: {e}", checkpoint.display());
1771 }
1772 }
1773
1774 let artifacts_root = home.join(".construct/artifacts");
1775 let mut entries = match tokio::fs::read_dir(&artifacts_root).await {
1776 Ok(e) => e,
1777 Err(_) => return,
1778 };
1779 while let Ok(Some(entry)) = entries.next_entry().await {
1780 let candidate = entry.path().join(run_id);
1781 if tokio::fs::metadata(&candidate).await.is_ok() {
1782 if let Err(e) = tokio::fs::remove_dir_all(&candidate).await {
1783 tracing::warn!(
1784 "Failed to remove artifacts dir {}: {e}",
1785 candidate.display()
1786 );
1787 }
1788 }
1789 }
1790}
1791
1792pub async fn handle_approve_workflow_run(
1799 State(state): State<AppState>,
1800 headers: HeaderMap,
1801 Path(run_id): Path<String>,
1802 Json(body): Json<ApproveWorkflowBody>,
1803) -> impl IntoResponse {
1804 if let Err(e) = require_auth(&state, &headers) {
1805 return e.into_response();
1806 }
1807
1808 let approved = body.approved;
1809 let feedback = body.feedback.unwrap_or_default();
1810
1811 let claimed = state.approval_registry.try_claim(&run_id);
1815 let cwd = claimed
1816 .as_ref()
1817 .map(|a| a.cwd.clone())
1818 .unwrap_or_else(|| "/tmp".to_string());
1819
1820 if claimed.is_none() {
1821 tracing::info!(
1822 "approve_workflow_run: no registry entry for run_id={run_id} (gateway restart?), \
1823 calling resume_workflow directly"
1824 );
1825 }
1826
1827 let tool_name = format!(
1829 "{}__resume_workflow",
1830 crate::agent::operator::OPERATOR_SERVER_NAME
1831 );
1832 let mut tool_args = serde_json::Map::new();
1833 tool_args.insert(
1834 "run_id".to_string(),
1835 serde_json::Value::String(run_id.clone()),
1836 );
1837 tool_args.insert("approved".to_string(), serde_json::Value::Bool(approved));
1838 tool_args.insert(
1839 "response".to_string(),
1840 serde_json::Value::String(feedback.clone()),
1841 );
1842 tool_args.insert("cwd".to_string(), serde_json::Value::String(cwd));
1843
1844 let mcp_result = if let Some(ref registry) = state.mcp_registry {
1845 let mcp_future = registry.call_tool(&tool_name, serde_json::Value::Object(tool_args));
1846 match tokio::time::timeout(std::time::Duration::from_secs(30), mcp_future).await {
1847 Ok(Ok(result_str)) => Ok(result_str),
1848 Ok(Err(e)) => Err(format!("operator tool call failed: {e:#}")),
1849 Err(_) => Err("operator tool call timed out (30s)".to_string()),
1850 }
1851 } else {
1852 Err("MCP registry not available — operator not connected".to_string())
1853 };
1854
1855 match mcp_result {
1856 Ok(_) => {
1857 let _ = state.event_tx.send(serde_json::json!({
1860 "type": "human_approval_resolved",
1861 "run_id": run_id,
1862 "approved": approved,
1863 "timestamp": chrono::Utc::now().to_rfc3339(),
1864 }));
1865
1866 (
1867 StatusCode::OK,
1868 Json(serde_json::json!({
1869 "status": "ok",
1870 "message": if approved { "Workflow approved" } else { "Workflow rejected" },
1871 "run_id": run_id,
1872 "approved": approved,
1873 })),
1874 )
1875 .into_response()
1876 }
1877 Err(e) => {
1878 tracing::warn!("approve_workflow_run: failed for run_id={run_id}: {e}");
1879 (
1880 StatusCode::BAD_GATEWAY,
1881 Json(serde_json::json!({
1882 "error": format!("Failed to resume workflow: {e}")
1883 })),
1884 )
1885 .into_response()
1886 }
1887 }
1888}
1889
1890#[derive(Deserialize)]
1891pub struct ApproveWorkflowBody {
1892 pub approved: bool,
1893 pub feedback: Option<String>,
1894}
1895
1896pub async fn handle_retry_workflow_run(
1903 State(state): State<AppState>,
1904 headers: HeaderMap,
1905 Path(run_id): Path<String>,
1906 body: Option<Json<RetryWorkflowBody>>,
1907) -> impl IntoResponse {
1908 if let Err(e) = require_auth(&state, &headers) {
1909 return e.into_response();
1910 }
1911
1912 let cwd = body
1913 .and_then(|Json(b)| b.cwd)
1914 .unwrap_or_else(|| "/tmp".to_string());
1915
1916 let tool_name = format!(
1917 "{}__retry_workflow",
1918 crate::agent::operator::OPERATOR_SERVER_NAME
1919 );
1920 let mut tool_args = serde_json::Map::new();
1921 tool_args.insert(
1922 "run_id".to_string(),
1923 serde_json::Value::String(run_id.clone()),
1924 );
1925 tool_args.insert("cwd".to_string(), serde_json::Value::String(cwd));
1926
1927 let mcp_result = if let Some(ref registry) = state.mcp_registry {
1928 let mcp_future = registry.call_tool(&tool_name, serde_json::Value::Object(tool_args));
1929 match tokio::time::timeout(std::time::Duration::from_secs(30), mcp_future).await {
1930 Ok(Ok(result_str)) => Ok(result_str),
1931 Ok(Err(e)) => Err(format!("operator tool call failed: {e:#}")),
1932 Err(_) => Err("operator tool call timed out (30s)".to_string()),
1933 }
1934 } else {
1935 Err("MCP registry not available — operator not connected".to_string())
1936 };
1937
1938 match mcp_result {
1939 Ok(result_str) => {
1940 let _ = state.event_tx.send(serde_json::json!({
1941 "type": "workflow_retry",
1942 "run_id": run_id,
1943 "timestamp": chrono::Utc::now().to_rfc3339(),
1944 }));
1945 let payload = serde_json::from_str::<serde_json::Value>(&result_str)
1946 .unwrap_or_else(|_| serde_json::json!({"raw": result_str}));
1947 (StatusCode::OK, Json(payload)).into_response()
1948 }
1949 Err(e) => {
1950 tracing::warn!("retry_workflow_run: failed for run_id={run_id}: {e}");
1951 (
1952 StatusCode::BAD_GATEWAY,
1953 Json(serde_json::json!({ "error": format!("Failed to retry workflow: {e}") })),
1954 )
1955 .into_response()
1956 }
1957 }
1958}
1959
1960#[derive(Deserialize)]
1961pub struct RetryWorkflowBody {
1962 pub cwd: Option<String>,
1963}
1964
1965pub async fn handle_agent_activity(
1971 State(state): State<AppState>,
1972 headers: HeaderMap,
1973 Path(agent_id): Path<String>,
1974 Query(query): Query<AgentActivityQuery>,
1975) -> impl IntoResponse {
1976 if let Err(e) = require_auth(&state, &headers) {
1977 return e.into_response();
1978 }
1979
1980 let runlogs_dir =
1981 std::path::PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/tmp".into()))
1982 .join(".construct/operator_mcp/runlogs");
1983 let path = runlogs_dir.join(format!("{agent_id}.jsonl"));
1984
1985 if !path.exists() {
1986 return (
1987 StatusCode::NOT_FOUND,
1988 Json(serde_json::json!({ "error": "No run log found for this agent" })),
1989 )
1990 .into_response();
1991 }
1992
1993 let content = match std::fs::read_to_string(&path) {
1994 Ok(c) => c,
1995 Err(e) => {
1996 return (
1997 StatusCode::INTERNAL_SERVER_ERROR,
1998 Json(serde_json::json!({ "error": format!("Failed to read log: {e}") })),
1999 )
2000 .into_response();
2001 }
2002 };
2003
2004 let view = query.view.as_deref().unwrap_or("summary");
2005 let limit = query.limit.unwrap_or(100).min(500) as usize;
2006
2007 let entries: Vec<serde_json::Value> = content
2008 .lines()
2009 .filter_map(|line| serde_json::from_str(line).ok())
2010 .collect();
2011
2012 match view {
2013 "tool_calls" => {
2014 let tools: Vec<&serde_json::Value> = entries
2016 .iter()
2017 .filter(|e| e.get("kind").and_then(|v| v.as_str()) == Some("tool_call"))
2018 .collect();
2019 let total = tools.len();
2020 let slice: Vec<_> = tools.into_iter().rev().take(limit).collect();
2021 Json(serde_json::json!({
2022 "agent_id": agent_id,
2023 "view": "tool_calls",
2024 "total": total,
2025 "entries": slice,
2026 }))
2027 .into_response()
2028 }
2029 "messages" => {
2030 let msgs: Vec<&serde_json::Value> = entries
2032 .iter()
2033 .filter(|e| {
2034 let kind = e.get("kind").and_then(|v| v.as_str()).unwrap_or("");
2035 kind == "message" || kind == "user_message"
2036 })
2037 .collect();
2038 let total = msgs.len();
2039 let slice: Vec<_> = msgs.into_iter().rev().take(limit).collect();
2040 Json(serde_json::json!({
2041 "agent_id": agent_id,
2042 "view": "messages",
2043 "total": total,
2044 "entries": slice,
2045 }))
2046 .into_response()
2047 }
2048 "errors" => {
2049 let errs: Vec<&serde_json::Value> = entries
2050 .iter()
2051 .filter(|e| {
2052 let kind = e.get("kind").and_then(|v| v.as_str()).unwrap_or("");
2053 kind == "error"
2054 || kind == "turn_failed"
2055 || e.get("status").and_then(|v| v.as_str()) == Some("failed")
2056 })
2057 .collect();
2058 Json(serde_json::json!({
2059 "agent_id": agent_id,
2060 "view": "errors",
2061 "total": errs.len(),
2062 "entries": errs,
2063 }))
2064 .into_response()
2065 }
2066 "full" => {
2067 let total = entries.len();
2069 let slice: Vec<_> = entries.into_iter().rev().take(limit).collect();
2070 Json(serde_json::json!({
2071 "agent_id": agent_id,
2072 "view": "full",
2073 "total": total,
2074 "entries": slice,
2075 }))
2076 .into_response()
2077 }
2078 _ => {
2079 let header = entries.first().cloned().unwrap_or_default();
2081 let tool_count = entries
2082 .iter()
2083 .filter(|e| e.get("kind").and_then(|v| v.as_str()) == Some("tool_call"))
2084 .count();
2085 let error_count = entries
2086 .iter()
2087 .filter(|e| {
2088 let kind = e.get("kind").and_then(|v| v.as_str()).unwrap_or("");
2089 kind == "error" || kind == "turn_failed"
2090 })
2091 .count();
2092 let last_message = entries
2093 .iter()
2094 .rev()
2095 .find(|e| e.get("kind").and_then(|v| v.as_str()) == Some("message"))
2096 .and_then(|e| e.get("text").and_then(|v| v.as_str()))
2097 .unwrap_or("");
2098 let last_msg_truncated = if last_message.len() > 5000 {
2100 &last_message[..5000]
2101 } else {
2102 last_message
2103 };
2104 let recent_tools: Vec<_> = entries
2106 .iter()
2107 .filter(|e| e.get("kind").and_then(|v| v.as_str()) == Some("tool_call"))
2108 .rev()
2109 .take(20)
2110 .cloned()
2111 .collect();
2112 let mut input_tokens: u64 = 0;
2114 let mut output_tokens: u64 = 0;
2115 let mut total_cost: f64 = 0.0;
2116 for e in &entries {
2117 if e.get("kind").and_then(|v| v.as_str()) == Some("turn_completed") {
2118 if let Some(usage) = e.get("usage") {
2119 input_tokens += usage
2120 .get("inputTokens")
2121 .and_then(|v| v.as_u64())
2122 .unwrap_or(0);
2123 output_tokens += usage
2124 .get("outputTokens")
2125 .and_then(|v| v.as_u64())
2126 .unwrap_or(0);
2127 total_cost += usage
2128 .get("totalCostUsd")
2129 .and_then(|v| v.as_f64())
2130 .unwrap_or(0.0);
2131 }
2132 }
2133 }
2134 Json(serde_json::json!({
2135 "agent_id": agent_id,
2136 "view": "summary",
2137 "title": header.get("title").and_then(|v| v.as_str()).unwrap_or(""),
2138 "agent_type": header.get("agent_type").and_then(|v| v.as_str()).unwrap_or(""),
2139 "total_events": entries.len(),
2140 "tool_call_count": tool_count,
2141 "error_count": error_count,
2142 "last_message": last_msg_truncated,
2143 "recent_tools": recent_tools,
2144 "usage": {
2145 "input_tokens": input_tokens,
2146 "output_tokens": output_tokens,
2147 "total_cost_usd": total_cost,
2148 },
2149 }))
2150 .into_response()
2151 }
2152 }
2153}
2154
2155#[derive(Deserialize)]
2156pub struct AgentActivityQuery {
2157 view: Option<String>,
2158 limit: Option<u32>,
2159}
2160
2161pub async fn handle_workflow_dashboard(
2163 State(state): State<AppState>,
2164 headers: HeaderMap,
2165) -> impl IntoResponse {
2166 if let Err(e) = require_auth(&state, &headers) {
2167 return e.into_response();
2168 }
2169
2170 let client = build_kumiho_client(&state);
2171 let space_path = workflow_space_path(&state);
2172 let runs_space = workflow_runs_space_path(&state);
2173
2174 let definitions = match client.list_items(&space_path, false).await {
2176 Ok(items) => merge_with_builtins(enrich_items(&client, items).await),
2177 Err(_) => merge_with_builtins(Vec::new()),
2178 };
2179 let definitions_count = definitions.len();
2180
2181 let (recent_runs, total_runs) = match client.list_items(&runs_space, false).await {
2183 Ok(mut items) => {
2184 let total = items.len();
2185 items.sort_by(|a, b| {
2186 let a_time = a.created_at.as_deref().unwrap_or("");
2187 let b_time = b.created_at.as_deref().unwrap_or("");
2188 b_time.cmp(a_time)
2189 });
2190 items.truncate(5);
2191
2192 let krefs: Vec<String> = items.iter().map(|i| i.kref.clone()).collect();
2193 let rev_map = client
2194 .batch_get_revisions(&krefs, "latest")
2195 .await
2196 .unwrap_or_default();
2197
2198 let runs: Vec<WorkflowRunSummary> = items
2199 .iter()
2200 .map(|item| to_run_summary(item, rev_map.get(&item.kref)))
2201 .collect();
2202
2203 (runs, total)
2204 }
2205 Err(_) => (Vec::new(), 0),
2206 };
2207
2208 let active_runs = recent_runs
2209 .iter()
2210 .filter(|r| r.status == "running" || r.status == "paused")
2211 .count();
2212
2213 let dashboard = WorkflowDashboard {
2214 definitions_count,
2215 definitions,
2216 active_runs,
2217 recent_runs,
2218 total_runs,
2219 };
2220
2221 Json(serde_json::json!({ "dashboard": dashboard })).into_response()
2222}