1use super::AppState;
18use super::api::require_auth;
19use super::api_agents::build_kumiho_client;
20use super::kumiho_client::{ItemResponse, KumihoClient, KumihoError, RevisionResponse, slugify};
21use axum::{
22 extract::{Path, Query, State},
23 http::{HeaderMap, StatusCode},
24 response::{IntoResponse, Json},
25};
26use parking_lot::Mutex;
27use serde::{Deserialize, Serialize};
28use std::collections::HashMap;
29use std::sync::OnceLock;
30use std::time::Instant;
31
32const WORKFLOW_SPACE_NAME: &str = "Workflows";
33const WORKFLOW_RUNS_SPACE_NAME: &str = "WorkflowRuns";
34const WORKFLOW_RUN_REQUESTS_SPACE_NAME: &str = "WorkflowRunRequests";
35
36fn workflow_project(state: &AppState) -> String {
37 state.config.lock().kumiho.harness_project.clone()
38}
39
40fn workflow_space_path(state: &AppState) -> String {
41 format!("/{}/{}", workflow_project(state), WORKFLOW_SPACE_NAME)
42}
43
44fn workflow_runs_space_path(state: &AppState) -> String {
45 format!("/{}/{}", workflow_project(state), WORKFLOW_RUNS_SPACE_NAME)
46}
47
48fn workflow_run_requests_space_path(state: &AppState) -> String {
49 format!(
50 "/{}/{}",
51 workflow_project(state),
52 WORKFLOW_RUN_REQUESTS_SPACE_NAME
53 )
54}
55
56struct WorkflowCache {
59 workflows: Vec<WorkflowResponse>,
60 include_deprecated: bool,
61 fetched_at: Instant,
62}
63
64static WORKFLOW_CACHE: OnceLock<Mutex<Option<WorkflowCache>>> = OnceLock::new();
65const CACHE_TTL_SECS: u64 = 3;
66
67fn get_cached(include_deprecated: bool) -> Option<Vec<WorkflowResponse>> {
68 let lock = WORKFLOW_CACHE.get_or_init(|| Mutex::new(None));
69 let cache = lock.lock();
70 if let Some(ref c) = *cache {
71 if c.include_deprecated == include_deprecated
72 && c.fetched_at.elapsed().as_secs() < CACHE_TTL_SECS
73 {
74 return Some(c.workflows.clone());
75 }
76 }
77 None
78}
79
80fn set_cached(workflows: &[WorkflowResponse], include_deprecated: bool) {
81 let lock = WORKFLOW_CACHE.get_or_init(|| Mutex::new(None));
82 let mut cache = lock.lock();
83 *cache = Some(WorkflowCache {
84 workflows: workflows.to_vec(),
85 include_deprecated,
86 fetched_at: Instant::now(),
87 });
88}
89
90fn invalidate_cache() {
91 if let Some(lock) = WORKFLOW_CACHE.get() {
92 let mut cache = lock.lock();
93 if let Some(ref mut c) = *cache {
95 c.fetched_at = Instant::now() - std::time::Duration::from_secs(CACHE_TTL_SECS + 1);
96 }
97 }
98}
99
100#[derive(Deserialize)]
103pub struct WorkflowListQuery {
104 #[serde(default)]
105 pub include_deprecated: bool,
106 pub q: Option<String>,
107}
108
109#[derive(Deserialize)]
110pub struct CreateWorkflowBody {
111 pub name: String,
112 pub description: String,
113 pub definition: String,
114 #[serde(default)]
115 pub version: Option<String>,
116 #[serde(default)]
117 pub tags: Option<Vec<String>>,
118}
119
120#[derive(Deserialize)]
121pub struct DeprecateBody {
122 pub kref: String,
123 pub deprecated: bool,
124}
125
126#[derive(Deserialize)]
127pub struct WorkflowRunsQuery {
128 #[serde(default = "default_limit")]
129 pub limit: usize,
130 #[serde(default)]
131 pub workflow: Option<String>,
132}
133
134fn default_limit() -> usize {
135 20
136}
137
138#[derive(Deserialize)]
139pub struct RunWorkflowBody {
140 #[serde(default)]
141 pub inputs: serde_json::Value,
142 #[serde(default)]
143 pub cwd: Option<String>,
144}
145
146#[derive(Serialize, Clone)]
149pub struct WorkflowResponse {
150 pub kref: String,
151 pub name: String,
152 pub item_name: String,
153 pub deprecated: bool,
154 pub created_at: Option<String>,
155 pub description: String,
156 pub definition: String,
157 pub version: String,
158 pub tags: Vec<String>,
159 pub steps: usize,
160 pub revision_number: i32,
161 #[serde(default = "default_source")]
165 pub source: String,
166 #[serde(skip_serializing_if = "Vec::is_empty")]
167 pub triggers: Vec<WorkflowTrigger>,
168}
169
170fn default_source() -> String {
171 "custom".to_string()
172}
173
174#[derive(Serialize, Clone, Debug)]
175pub struct WorkflowTrigger {
176 pub on_kind: String,
177 pub on_tag: String,
178 #[serde(skip_serializing_if = "String::is_empty")]
179 pub on_name_pattern: String,
180}
181
182#[derive(Serialize, Clone)]
183pub struct WorkflowRunSummary {
184 pub kref: String,
185 pub run_id: String,
186 pub workflow_name: String,
187 pub status: String,
188 pub started_at: String,
189 pub completed_at: String,
190 pub steps_completed: String,
191 pub steps_total: String,
192 pub error: String,
193 #[serde(default, skip_serializing_if = "String::is_empty")]
196 pub workflow_item_kref: String,
197 #[serde(default, skip_serializing_if = "String::is_empty")]
201 pub workflow_revision_kref: String,
202}
203
204#[derive(Serialize, Clone)]
205pub struct TranscriptEntry {
206 pub speaker: String,
207 pub content: String,
208 pub round: u32,
209}
210
211#[derive(Serialize, Clone, Default)]
212pub struct ApprovalOutputData {
213 #[serde(skip_serializing_if = "Option::is_none")]
214 pub awaiting_approval: Option<bool>,
215 #[serde(skip_serializing_if = "Option::is_none")]
216 pub approval_message: Option<String>,
217 #[serde(skip_serializing_if = "Vec::is_empty")]
218 pub approve_keywords: Vec<String>,
219 #[serde(skip_serializing_if = "Vec::is_empty")]
220 pub reject_keywords: Vec<String>,
221}
222
223#[derive(Serialize, Clone)]
224pub struct WorkflowStepDetail {
225 pub step_id: String,
226 pub status: String,
227 #[serde(skip_serializing_if = "String::is_empty")]
228 pub agent_id: String,
229 #[serde(skip_serializing_if = "String::is_empty")]
230 pub agent_type: String,
231 #[serde(skip_serializing_if = "String::is_empty")]
232 pub role: String,
233 #[serde(skip_serializing_if = "String::is_empty")]
234 pub template_name: String,
235 #[serde(skip_serializing_if = "String::is_empty")]
236 pub output_preview: String,
237 #[serde(skip_serializing_if = "String::is_empty")]
238 pub artifact_path: String,
239 #[serde(skip_serializing_if = "Vec::is_empty")]
240 pub skills: Vec<String>,
241 #[serde(skip_serializing_if = "Vec::is_empty")]
242 pub transcript: Vec<TranscriptEntry>,
243 #[serde(skip_serializing_if = "Option::is_none")]
244 pub output_data: Option<ApprovalOutputData>,
245}
246
247#[derive(Serialize, Clone)]
248pub struct WorkflowRunDetail {
249 #[serde(flatten)]
250 pub summary: WorkflowRunSummary,
251 pub steps: Vec<WorkflowStepDetail>,
252}
253
254#[derive(Serialize)]
255pub struct WorkflowDashboard {
256 pub definitions_count: usize,
257 pub definitions: Vec<WorkflowResponse>,
258 pub active_runs: usize,
259 pub recent_runs: Vec<WorkflowRunSummary>,
260 pub total_runs: usize,
261}
262
263fn kumiho_err(e: KumihoError) -> (StatusCode, Json<serde_json::Value>) {
266 match &e {
267 KumihoError::Unreachable(_) => (
268 StatusCode::SERVICE_UNAVAILABLE,
269 Json(serde_json::json!({ "error": format!("Kumiho service unavailable: {e}") })),
270 ),
271 KumihoError::Api { status, body } => {
272 let code = if *status == 401 || *status == 403 {
273 StatusCode::BAD_GATEWAY
274 } else {
275 StatusCode::from_u16(*status).unwrap_or(StatusCode::BAD_GATEWAY)
276 };
277 (
278 code,
279 Json(serde_json::json!({ "error": format!("Kumiho upstream: {body}") })),
280 )
281 }
282 KumihoError::Decode(msg) => (
283 StatusCode::BAD_GATEWAY,
284 Json(serde_json::json!({ "error": format!("Bad response from Kumiho: {msg}") })),
285 ),
286 }
287}
288
289fn workflow_metadata(body: &CreateWorkflowBody) -> HashMap<String, String> {
290 let mut meta = HashMap::new();
291 meta.insert("display_name".to_string(), body.name.clone());
292 meta.insert("description".to_string(), body.description.clone());
293 meta.insert("definition".to_string(), body.definition.clone());
294 meta.insert("created_by".to_string(), "construct-dashboard".to_string());
295 let steps = count_yaml_steps(&body.definition);
297 meta.insert("steps".to_string(), steps.to_string());
298 if let Some(ref tags) = body.tags {
299 if !tags.is_empty() {
300 meta.insert("tags".to_string(), tags.join(","));
301 }
302 }
303 meta.insert(
305 "_search_text".to_string(),
306 format!("{} {}", body.name, body.description),
307 );
308 meta
309}
310
311fn count_yaml_steps(content: &str) -> usize {
312 let mut count = 0;
313 let mut in_steps = false;
314 for line in content.lines() {
315 let trimmed = line.trim();
316 if trimmed == "steps:" || trimmed == "tasks:" {
317 in_steps = true;
318 continue;
319 }
320 if in_steps {
321 if trimmed.starts_with("- id:") {
322 count += 1;
323 }
324 if !trimmed.is_empty()
325 && !trimmed.starts_with('-')
326 && !trimmed.starts_with(' ')
327 && !trimmed.starts_with('#')
328 && !line.starts_with(' ')
329 {
330 break;
331 }
332 }
333 }
334 count
335}
336
337fn to_workflow_response(item: &ItemResponse, rev: Option<&RevisionResponse>) -> WorkflowResponse {
338 let meta = rev.map(|r| &r.metadata);
339 let get = |key: &str| -> String { meta.and_then(|m| m.get(key)).cloned().unwrap_or_default() };
340 let tags_str = get("tags");
341 let tags: Vec<String> = if tags_str.is_empty() {
342 Vec::new()
343 } else {
344 tags_str.split(',').map(|s| s.trim().to_string()).collect()
345 };
346 let steps: usize = get("steps").parse().unwrap_or(0);
347
348 let display_name = {
349 let n = get("display_name");
350 if n.is_empty() {
351 item.item_name.clone()
352 } else {
353 n
354 }
355 };
356
357 let definition = get("definition");
358 let triggers = extract_triggers(&definition);
359
360 WorkflowResponse {
361 kref: item.kref.clone(),
362 name: display_name,
363 item_name: item.item_name.clone(),
364 deprecated: item.deprecated,
365 created_at: item.created_at.clone(),
366 description: get("description"),
367 definition,
368 version: format!("{}", rev.map(|r| r.number).unwrap_or(0)),
369 tags,
370 steps,
371 revision_number: rev.map(|r| r.number).unwrap_or(0),
372 source: "custom".to_string(),
373 triggers,
374 }
375}
376
377async fn prefer_artifact_definitions(
385 client: &super::kumiho_client::KumihoClient,
386 revs: &mut HashMap<String, RevisionResponse>,
387) {
388 for rev in revs.values_mut() {
389 if let Ok(artifact) = client
390 .get_artifact_by_name(&rev.kref, "workflow.yaml")
391 .await
392 {
393 let path = artifact
394 .location
395 .strip_prefix("file://")
396 .unwrap_or(&artifact.location);
397 if let Ok(yaml) = tokio::fs::read_to_string(path).await {
398 rev.metadata.insert("definition".to_string(), yaml);
399 }
400 }
401 }
402}
403
404async fn enrich_items(
405 client: &super::kumiho_client::KumihoClient,
406 items: Vec<ItemResponse>,
407) -> Vec<WorkflowResponse> {
408 let items: Vec<ItemResponse> = items.into_iter().filter(|i| i.kind == "workflow").collect();
411
412 if items.is_empty() {
413 return Vec::new();
414 }
415
416 let krefs: Vec<String> = items.iter().map(|i| i.kref.clone()).collect();
417
418 if let Ok(mut rev_map) = client.batch_get_revisions(&krefs, "published").await {
419 let missing: Vec<String> = krefs
420 .iter()
421 .filter(|k| !rev_map.contains_key(*k))
422 .cloned()
423 .collect();
424 let mut latest_map = if !missing.is_empty() {
425 client
426 .batch_get_revisions(&missing, "latest")
427 .await
428 .unwrap_or_default()
429 } else {
430 HashMap::new()
431 };
432
433 prefer_artifact_definitions(client, &mut rev_map).await;
439 prefer_artifact_definitions(client, &mut latest_map).await;
440
441 return items
442 .iter()
443 .map(|item| {
444 let rev = rev_map
445 .get(&item.kref)
446 .or_else(|| latest_map.get(&item.kref));
447 to_workflow_response(item, rev)
448 })
449 .collect();
450 }
451
452 let mut workflows = Vec::with_capacity(items.len());
454 for item in &items {
455 let rev = client.get_published_or_latest(&item.kref).await.ok();
456 workflows.push(to_workflow_response(item, rev.as_ref()));
457 }
458 workflows
459}
460
461fn to_run_summary(item: &ItemResponse, rev: Option<&RevisionResponse>) -> WorkflowRunSummary {
462 let meta = rev.map(|r| &r.metadata);
463 let get = |key: &str| -> String { meta.and_then(|m| m.get(key)).cloned().unwrap_or_default() };
464
465 let run_id_meta = get("run_id");
466 WorkflowRunSummary {
467 kref: item.kref.clone(),
468 run_id: if run_id_meta.is_empty() {
469 item.item_name.clone()
470 } else {
471 run_id_meta
472 },
473 workflow_name: {
474 let wn = get("workflow_name");
475 if wn.is_empty() { get("workflow") } else { wn }
476 },
477 status: get("status"),
478 started_at: get("started_at"),
479 completed_at: get("completed_at"),
480 steps_completed: get("steps_completed"),
481 steps_total: get("steps_total"),
482 error: get("error"),
483 workflow_item_kref: get("workflow_item_kref"),
484 workflow_revision_kref: get("workflow_revision_kref"),
485 }
486}
487
488fn extract_steps_from_metadata(meta: &HashMap<String, String>) -> Vec<WorkflowStepDetail> {
489 const SKIP_KEYS: &[&str] = &["step_count", "steps_completed", "steps_total"];
491
492 let mut steps = Vec::new();
493 for (key, value) in meta {
494 if SKIP_KEYS.contains(&key.as_str()) {
495 continue;
496 }
497 if let Some(step_id) = key.strip_prefix("step_") {
498 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(value) {
501 if !parsed.is_object() {
503 continue;
504 }
505 let skills = parsed
506 .get("skills")
507 .and_then(|v| v.as_array())
508 .map(|arr| {
509 arr.iter()
510 .filter_map(|s| s.as_str().map(|s| s.to_string()))
511 .collect::<Vec<_>>()
512 })
513 .unwrap_or_default();
514 let transcript = parsed
516 .get("transcript")
517 .and_then(|v| v.as_str())
518 .and_then(|s| serde_json::from_str::<Vec<serde_json::Value>>(s).ok())
519 .map(|arr| {
520 arr.iter()
521 .map(|entry| TranscriptEntry {
522 speaker: entry
523 .get("speaker")
524 .and_then(|v| v.as_str())
525 .unwrap_or("?")
526 .to_string(),
527 content: entry
528 .get("content")
529 .and_then(|v| v.as_str())
530 .unwrap_or("")
531 .to_string(),
532 round: entry.get("round").and_then(|v| v.as_u64()).unwrap_or(0)
533 as u32,
534 })
535 .collect::<Vec<_>>()
536 })
537 .unwrap_or_default();
538 let output_data = parsed.get("output_data").and_then(|v| {
540 let obj = if let Some(s) = v.as_str() {
542 serde_json::from_str::<serde_json::Value>(s).ok()
543 } else {
544 Some(v.clone())
545 };
546 obj.map(|o| ApprovalOutputData {
547 awaiting_approval: o.get("awaiting_approval").and_then(|v| v.as_bool()),
548 approval_message: o
549 .get("approval_message")
550 .and_then(|v| v.as_str())
551 .map(String::from),
552 approve_keywords: o
553 .get("approve_keywords")
554 .and_then(|v| v.as_array())
555 .map(|arr| {
556 arr.iter()
557 .filter_map(|s| s.as_str().map(String::from))
558 .collect()
559 })
560 .unwrap_or_default(),
561 reject_keywords: o
562 .get("reject_keywords")
563 .and_then(|v| v.as_array())
564 .map(|arr| {
565 arr.iter()
566 .filter_map(|s| s.as_str().map(String::from))
567 .collect()
568 })
569 .unwrap_or_default(),
570 })
571 });
572 steps.push(WorkflowStepDetail {
573 step_id: step_id.to_string(),
574 status: parsed
575 .get("status")
576 .and_then(|v| v.as_str())
577 .unwrap_or("unknown")
578 .to_string(),
579 agent_id: parsed
580 .get("agent_id")
581 .and_then(|v| v.as_str())
582 .unwrap_or("")
583 .to_string(),
584 agent_type: parsed
585 .get("agent_type")
586 .and_then(|v| v.as_str())
587 .unwrap_or("")
588 .to_string(),
589 role: parsed
590 .get("role")
591 .and_then(|v| v.as_str())
592 .unwrap_or("")
593 .to_string(),
594 template_name: parsed
595 .get("template_name")
596 .and_then(|v| v.as_str())
597 .unwrap_or("")
598 .to_string(),
599 output_preview: parsed
600 .get("output_preview")
601 .and_then(|v| v.as_str())
602 .unwrap_or("")
603 .to_string(),
604 artifact_path: parsed
605 .get("artifact_path")
606 .and_then(|v| v.as_str())
607 .unwrap_or("")
608 .to_string(),
609 skills,
610 transcript,
611 output_data,
612 });
613 } else if value.contains(r#""status""#) {
614 let status = if let Some(start) = value.find(r#""status": ""#) {
616 let rest = &value[start + 11..];
617 rest.split('"').next().unwrap_or("unknown")
618 } else {
619 "unknown"
620 };
621 steps.push(WorkflowStepDetail {
622 step_id: step_id.to_string(),
623 status: status.to_string(),
624 agent_id: String::new(),
625 agent_type: String::new(),
626 role: String::new(),
627 template_name: String::new(),
628 output_preview: String::new(),
629 artifact_path: String::new(),
630 skills: Vec::new(),
631 transcript: Vec::new(),
632 output_data: None,
633 });
634 }
635 }
636 }
637 steps
638}
639
640fn to_run_detail(item: &ItemResponse, rev: Option<&RevisionResponse>) -> WorkflowRunDetail {
641 let summary = to_run_summary(item, rev);
642 let steps = rev
643 .map(|r| extract_steps_from_metadata(&r.metadata))
644 .unwrap_or_default();
645 WorkflowRunDetail { summary, steps }
646}
647
648const BUILTIN_WORKFLOWS_DIR: &str = ".construct/operator_mcp/workflow/builtins";
652
653fn discover_builtin_workflows() -> Vec<WorkflowResponse> {
657 let home = directories::UserDirs::new()
658 .map(|u| u.home_dir().to_path_buf())
659 .unwrap_or_default();
660 let builtins_dir = home.join(BUILTIN_WORKFLOWS_DIR);
661 let Ok(entries) = std::fs::read_dir(&builtins_dir) else {
662 return Vec::new();
663 };
664
665 let mut workflows = Vec::new();
666 for entry in entries.flatten() {
667 let path = entry.path();
668 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
669 if ext != "yaml" && ext != "yml" {
670 continue;
671 }
672 let Ok(content) = std::fs::read_to_string(&path) else {
673 continue;
674 };
675 let name = extract_yaml_field(&content, "name").unwrap_or_else(|| {
677 path.file_stem()
678 .unwrap_or_default()
679 .to_string_lossy()
680 .into_owned()
681 });
682 let description = extract_yaml_field(&content, "description").unwrap_or_default();
683 let version = extract_yaml_field(&content, "version").unwrap_or_else(|| "1.0".into());
684 let tags_str = extract_yaml_field(&content, "tags").unwrap_or_default();
685 let tags: Vec<String> = if tags_str.is_empty() {
686 Vec::new()
687 } else {
688 tags_str
690 .trim_start_matches('[')
691 .trim_end_matches(']')
692 .split(',')
693 .map(|s| s.trim().trim_matches('"').trim_matches('\'').to_string())
694 .filter(|s| !s.is_empty())
695 .collect()
696 };
697 let steps = count_yaml_steps(&content);
698 let item_name = slugify(&name);
699
700 let triggers = extract_triggers(&content);
701 workflows.push(WorkflowResponse {
702 kref: format!("builtin://{item_name}"),
703 name,
704 item_name,
705 deprecated: false,
706 created_at: None,
707 description,
708 definition: content,
709 version,
710 tags,
711 steps,
712 revision_number: 0,
713 source: "builtin".to_string(),
714 triggers,
715 });
716 }
717 workflows
718}
719
720fn extract_yaml_field(content: &str, field: &str) -> Option<String> {
722 for line in content.lines() {
723 let trimmed = line.trim();
724 if let Some(rest) = trimmed.strip_prefix(field) {
725 if let Some(value) = rest.strip_prefix(':') {
726 let v = value.trim();
727 let v = v.trim_matches('"').trim_matches('\'');
729 if !v.is_empty() {
730 return Some(v.to_string());
731 }
732 }
733 }
734 if trimmed == "steps:" || trimmed == "inputs:" {
736 break;
737 }
738 }
739 None
740}
741
742fn extract_triggers(content: &str) -> Vec<WorkflowTrigger> {
747 let mut triggers = Vec::new();
748 let mut in_triggers = false;
749 let mut current_kind = String::new();
750 let mut current_tag = String::new();
751 let mut current_pattern = String::new();
752
753 for line in content.lines() {
754 let trimmed = line.trim();
755 if trimmed == "triggers:" {
756 in_triggers = true;
757 continue;
758 }
759 if !in_triggers {
760 continue;
761 }
762 if !trimmed.is_empty()
764 && !trimmed.starts_with('-')
765 && !trimmed.starts_with('#')
766 && !line.starts_with(' ')
767 && !line.starts_with('\t')
768 {
769 break;
770 }
771 if trimmed.starts_with("- ") {
773 if !current_kind.is_empty() {
774 triggers.push(WorkflowTrigger {
775 on_kind: std::mem::take(&mut current_kind),
776 on_tag: if current_tag.is_empty() {
777 "ready".to_string()
778 } else {
779 std::mem::take(&mut current_tag)
780 },
781 on_name_pattern: std::mem::take(&mut current_pattern),
782 });
783 }
784 let after_dash = trimmed.strip_prefix("- ").unwrap_or("");
786 if let Some((k, v)) = after_dash.split_once(':') {
787 let k = k.trim();
788 let v = v.trim().trim_matches('"').trim_matches('\'');
789 match k {
790 "on_kind" => current_kind = v.to_string(),
791 "on_tag" => current_tag = v.to_string(),
792 "on_name_pattern" => current_pattern = v.to_string(),
793 _ => {}
794 }
795 }
796 continue;
797 }
798 if let Some((k, v)) = trimmed.split_once(':') {
800 let k = k.trim();
801 let v = v.trim().trim_matches('"').trim_matches('\'');
802 match k {
803 "on_kind" => current_kind = v.to_string(),
804 "on_tag" => current_tag = v.to_string(),
805 "on_name_pattern" => current_pattern = v.to_string(),
806 _ => {}
807 }
808 }
809 }
810 if !current_kind.is_empty() {
812 triggers.push(WorkflowTrigger {
813 on_kind: current_kind,
814 on_tag: if current_tag.is_empty() {
815 "ready".to_string()
816 } else {
817 current_tag
818 },
819 on_name_pattern: current_pattern,
820 });
821 }
822 triggers
823}
824
825fn extract_cron_triggers(content: &str) -> Vec<(String, Option<String>)> {
830 let mut results = Vec::new();
831 let mut in_triggers = false;
832 let mut current_cron = String::new();
833 let mut current_tz: Option<String> = None;
834
835 for line in content.lines() {
836 let trimmed = line.trim();
837 if trimmed == "triggers:" {
838 in_triggers = true;
839 continue;
840 }
841 if !in_triggers {
842 continue;
843 }
844 if !trimmed.is_empty()
846 && !trimmed.starts_with('-')
847 && !trimmed.starts_with('#')
848 && !line.starts_with(' ')
849 && !line.starts_with('\t')
850 {
851 break;
852 }
853 if trimmed.starts_with("- ") {
855 if !current_cron.is_empty() {
856 results.push((std::mem::take(&mut current_cron), current_tz.take()));
857 }
858 let after_dash = trimmed.strip_prefix("- ").unwrap_or("");
860 if let Some((k, v)) = after_dash.split_once(':') {
861 let k = k.trim();
862 let v = v.trim().trim_matches('"').trim_matches('\'');
863 match k {
864 "cron" if !v.is_empty() => current_cron = v.to_string(),
865 "timezone" | "tz" if !v.is_empty() => current_tz = Some(v.to_string()),
866 _ => {}
867 }
868 }
869 continue;
870 }
871 if let Some((k, v)) = trimmed.split_once(':') {
873 let k = k.trim();
874 let v = v.trim().trim_matches('"').trim_matches('\'');
875 match k {
876 "cron" if !v.is_empty() => current_cron = v.to_string(),
877 "timezone" | "tz" if !v.is_empty() => current_tz = Some(v.to_string()),
878 _ => {}
879 }
880 }
881 }
882 if !current_cron.is_empty() {
884 results.push((current_cron, current_tz));
885 }
886 results
887}
888
889async fn persist_workflow_artifact(
895 client: &KumihoClient,
896 revision_kref: &str,
897 revision_number: i32,
898 workflow_name: &str,
899 definition: &str,
900) {
901 let home = directories::UserDirs::new()
902 .map(|u| u.home_dir().to_path_buf())
903 .unwrap_or_default();
904 let dir = home.join(".construct/workflows");
905 let _ = tokio::fs::create_dir_all(&dir).await;
906
907 let slug = slugify(workflow_name);
908 let file_path = dir.join(format!("{slug}.r{revision_number}.yaml"));
909 let location = format!("file://{}", file_path.display());
910
911 if let Err(e) = tokio::fs::write(&file_path, definition).await {
912 tracing::warn!("Failed to write workflow YAML for {workflow_name}: {e}");
913 return;
914 }
915
916 if let Err(e) = client
917 .create_artifact(revision_kref, "workflow.yaml", &location, HashMap::new())
918 .await
919 {
920 tracing::warn!("Failed to create artifact for workflow {workflow_name}: {e}");
921 } else {
922 tracing::info!("Persisted workflow artifact: {location}");
923 }
924}
925
926fn sync_cron_for_workflow(state: &AppState, workflow_name: &str, definition: &str) {
927 let cron_triggers = extract_cron_triggers(definition);
928 let config = state.config.lock();
929
930 if let Err(e) = crate::cron::remove_workflow_cron_jobs(&config, workflow_name) {
932 tracing::warn!("Failed to remove old cron jobs for workflow {workflow_name}: {e}");
933 }
934
935 if cron_triggers.is_empty() {
936 return;
937 }
938
939 let wf_crons: Vec<(String, String, Option<String>)> = cron_triggers
940 .into_iter()
941 .map(|(expr, tz)| (workflow_name.to_string(), expr, tz))
942 .collect();
943
944 if let Err(e) = crate::cron::sync_workflow_cron_jobs(&config, &wf_crons) {
945 tracing::warn!("Failed to sync cron triggers for workflow {workflow_name}: {e}");
946 }
947}
948
949fn merge_with_builtins(mut kumiho_workflows: Vec<WorkflowResponse>) -> Vec<WorkflowResponse> {
955 let builtins = discover_builtin_workflows();
956 if builtins.is_empty() {
957 return kumiho_workflows;
958 }
959
960 let builtin_names: std::collections::HashSet<String> =
961 builtins.iter().map(|b| b.item_name.clone()).collect();
962
963 for wf in &mut kumiho_workflows {
965 if builtin_names.contains(&wf.item_name) {
966 wf.source = "builtin-modified".to_string();
967 }
968 }
969
970 let kumiho_names: std::collections::HashSet<String> = kumiho_workflows
972 .iter()
973 .map(|w| w.item_name.clone())
974 .collect();
975 for builtin in builtins {
976 if !kumiho_names.contains(&builtin.item_name) {
977 kumiho_workflows.push(builtin);
978 }
979 }
980
981 kumiho_workflows
982}
983
984pub async fn handle_list_workflows(
988 State(state): State<AppState>,
989 headers: HeaderMap,
990 Query(query): Query<WorkflowListQuery>,
991) -> impl IntoResponse {
992 if let Err(e) = require_auth(&state, &headers) {
993 return e.into_response();
994 }
995
996 let client = build_kumiho_client(&state);
997 let project = workflow_project(&state);
998 let space_path = workflow_space_path(&state);
999
1000 if query.q.is_none() {
1002 if let Some(cached) = get_cached(query.include_deprecated) {
1003 return Json(serde_json::json!({ "workflows": cached })).into_response();
1004 }
1005 }
1006
1007 let items_result = if let Some(ref q) = query.q {
1008 client
1009 .search_items(q, &project, "workflow", query.include_deprecated)
1010 .await
1011 .map(|results| results.into_iter().map(|sr| sr.item).collect::<Vec<_>>())
1012 } else {
1013 client
1014 .list_items(&space_path, query.include_deprecated)
1015 .await
1016 };
1017
1018 match items_result {
1019 Ok(items) => {
1020 let workflows = merge_with_builtins(enrich_items(&client, items).await);
1021 if query.q.is_none() {
1022 set_cached(&workflows, query.include_deprecated);
1023 }
1024 Json(serde_json::json!({ "workflows": workflows })).into_response()
1025 }
1026 Err(ref e) if matches!(e, KumihoError::Api { status: 404, .. }) => {
1027 let _ = client.ensure_project(&project).await;
1028 let _ = client.ensure_space(&project, WORKFLOW_SPACE_NAME).await;
1029 let workflows = merge_with_builtins(Vec::new());
1030 Json(serde_json::json!({ "workflows": workflows })).into_response()
1031 }
1032 Err(e) => {
1033 if query.q.is_none() {
1035 let lock = WORKFLOW_CACHE.get_or_init(|| Mutex::new(None));
1036 let cache = lock.lock();
1037 if let Some(ref c) = *cache {
1038 tracing::warn!("Workflows list failed, returning stale cache: {e}");
1039 return Json(serde_json::json!({ "workflows": c.workflows })).into_response();
1040 }
1041 }
1042 kumiho_err(e).into_response()
1043 }
1044 }
1045}
1046
1047#[derive(Debug)]
1053struct ValidationOutcome {
1054 valid: bool,
1055 errors: Vec<serde_json::Value>,
1056 warnings: Vec<serde_json::Value>,
1057}
1058
1059async fn validate_via_operator(
1063 state: &AppState,
1064 args: serde_json::Map<String, serde_json::Value>,
1065) -> Result<ValidationOutcome, String> {
1066 let tool_name = format!(
1067 "{}__validate_workflow",
1068 crate::agent::operator::OPERATOR_SERVER_NAME
1069 );
1070
1071 let registry = state
1072 .mcp_registry
1073 .as_ref()
1074 .ok_or_else(|| "MCP registry not available — operator not connected".to_string())?;
1075
1076 let fut = registry.call_tool(&tool_name, serde_json::Value::Object(args));
1077 let result_str = match tokio::time::timeout(std::time::Duration::from_secs(15), fut).await {
1078 Ok(Ok(s)) => s,
1079 Ok(Err(e)) => return Err(format!("operator validate_workflow failed: {e:#}")),
1080 Err(_) => return Err("operator validate_workflow timed out (15s)".to_string()),
1081 };
1082
1083 let outer: serde_json::Value = serde_json::from_str(&result_str)
1085 .map_err(|e| format!("validate_workflow: outer JSON parse failed: {e}"))?;
1086
1087 let inner_text = outer
1088 .get("content")
1089 .and_then(|c| c.get(0))
1090 .and_then(|c0| c0.get("text"))
1091 .and_then(|t| t.as_str())
1092 .ok_or_else(|| "validate_workflow: missing content[0].text".to_string())?;
1093
1094 let inner: serde_json::Value = serde_json::from_str(inner_text)
1095 .map_err(|e| format!("validate_workflow: inner JSON parse failed: {e}"))?;
1096
1097 let valid = inner
1098 .get("valid")
1099 .and_then(|v| v.as_bool())
1100 .ok_or_else(|| "validate_workflow: missing `valid` field".to_string())?;
1101 let errors = inner
1102 .get("errors")
1103 .and_then(|v| v.as_array())
1104 .cloned()
1105 .unwrap_or_default();
1106 let warnings = inner
1107 .get("warnings")
1108 .and_then(|v| v.as_array())
1109 .cloned()
1110 .unwrap_or_default();
1111
1112 Ok(ValidationOutcome {
1113 valid,
1114 errors,
1115 warnings,
1116 })
1117}
1118
1119fn validation_error_response(
1121 outcome: &ValidationOutcome,
1122 context: &str,
1123) -> (StatusCode, Json<serde_json::Value>) {
1124 (
1125 StatusCode::BAD_REQUEST,
1126 Json(serde_json::json!({
1127 "error": format!("Workflow validation failed: {context}"),
1128 "valid": false,
1129 "errors": outcome.errors,
1130 "warnings": outcome.warnings,
1131 })),
1132 )
1133}
1134
1135pub async fn handle_create_workflow(
1137 State(state): State<AppState>,
1138 headers: HeaderMap,
1139 Json(body): Json<CreateWorkflowBody>,
1140) -> impl IntoResponse {
1141 if let Err(e) = require_auth(&state, &headers) {
1142 return e.into_response();
1143 }
1144
1145 let mut v_args = serde_json::Map::new();
1148 v_args.insert(
1149 "workflow_yaml".to_string(),
1150 serde_json::Value::String(body.definition.clone()),
1151 );
1152 match validate_via_operator(&state, v_args).await {
1153 Ok(outcome) if !outcome.valid => {
1154 return validation_error_response(&outcome, "cannot save invalid workflow")
1155 .into_response();
1156 }
1157 Ok(_) => {}
1158 Err(e) => {
1159 tracing::warn!("create_workflow: validation skipped (infra error): {e}");
1160 }
1161 }
1162
1163 let client = build_kumiho_client(&state);
1164 let project = workflow_project(&state);
1165 let space_path = workflow_space_path(&state);
1166
1167 if let Err(e) = client.ensure_project(&project).await {
1168 return kumiho_err(e).into_response();
1169 }
1170 if let Err(e) = client.ensure_space(&project, WORKFLOW_SPACE_NAME).await {
1171 return kumiho_err(e).into_response();
1172 }
1173
1174 let slug = slugify(&body.name);
1175 let item = match client
1176 .create_item(&space_path, &slug, "workflow", HashMap::new())
1177 .await
1178 {
1179 Ok(item) => item,
1180 Err(e) => return kumiho_err(e).into_response(),
1181 };
1182
1183 let metadata = workflow_metadata(&body);
1184 let rev = match client.create_revision(&item.kref, metadata).await {
1185 Ok(rev) => rev,
1186 Err(e) => return kumiho_err(e).into_response(),
1187 };
1188
1189 persist_workflow_artifact(&client, &rev.kref, rev.number, &body.name, &body.definition).await;
1191 let _ = client.tag_revision(&rev.kref, "published").await;
1192
1193 invalidate_cache();
1194 sync_cron_for_workflow(&state, &body.name, &body.definition);
1195
1196 let workflow = to_workflow_response(&item, Some(&rev));
1197 (
1198 StatusCode::CREATED,
1199 Json(serde_json::json!({ "workflow": workflow })),
1200 )
1201 .into_response()
1202}
1203
1204pub async fn handle_update_workflow(
1206 State(state): State<AppState>,
1207 headers: HeaderMap,
1208 Path(kref): Path<String>,
1209 Json(body): Json<CreateWorkflowBody>,
1210) -> impl IntoResponse {
1211 if let Err(e) = require_auth(&state, &headers) {
1212 return e.into_response();
1213 }
1214
1215 let mut v_args = serde_json::Map::new();
1217 v_args.insert(
1218 "workflow_yaml".to_string(),
1219 serde_json::Value::String(body.definition.clone()),
1220 );
1221 match validate_via_operator(&state, v_args).await {
1222 Ok(outcome) if !outcome.valid => {
1223 return validation_error_response(&outcome, "cannot save invalid workflow")
1224 .into_response();
1225 }
1226 Ok(_) => {}
1227 Err(e) => {
1228 tracing::warn!("update_workflow: validation skipped (infra error): {e}");
1229 }
1230 }
1231
1232 let kref = format!("kref://{kref}");
1233 let client = build_kumiho_client(&state);
1234
1235 let metadata = workflow_metadata(&body);
1236 let rev = match client.create_revision(&kref, metadata).await {
1237 Ok(rev) => rev,
1238 Err(e) => return kumiho_err(e).into_response(),
1239 };
1240
1241 persist_workflow_artifact(&client, &rev.kref, rev.number, &body.name, &body.definition).await;
1243 let _ = client.tag_revision(&rev.kref, "published").await;
1244
1245 let items = match client.list_items(&workflow_space_path(&state), true).await {
1246 Ok(items) => items,
1247 Err(e) => return kumiho_err(e).into_response(),
1248 };
1249
1250 invalidate_cache();
1251 sync_cron_for_workflow(&state, &body.name, &body.definition);
1252
1253 let item = items.iter().find(|i| i.kref == kref);
1254 match item {
1255 Some(item) => {
1256 let workflow = to_workflow_response(item, Some(&rev));
1257 Json(serde_json::json!({ "workflow": workflow })).into_response()
1258 }
1259 None => {
1260 let fallback = ItemResponse {
1261 kref: kref.clone(),
1262 name: body.name.clone(),
1263 item_name: body.name.clone(),
1264 kind: "workflow".to_string(),
1265 deprecated: false,
1266 created_at: None,
1267 metadata: HashMap::new(),
1268 };
1269 let workflow = to_workflow_response(&fallback, Some(&rev));
1270 Json(serde_json::json!({ "workflow": workflow })).into_response()
1271 }
1272 }
1273}
1274
1275pub async fn handle_deprecate_workflow(
1277 State(state): State<AppState>,
1278 headers: HeaderMap,
1279 Json(body): Json<DeprecateBody>,
1280) -> impl IntoResponse {
1281 if let Err(e) = require_auth(&state, &headers) {
1282 return e.into_response();
1283 }
1284
1285 let kref = body.kref.clone();
1286 let client = build_kumiho_client(&state);
1287
1288 match client.deprecate_item(&kref, body.deprecated).await {
1289 Ok(item) => {
1290 invalidate_cache();
1291 let rev = client.get_published_or_latest(&kref).await.ok();
1292
1293 if body.deprecated {
1295 if let Some(item_segment) = kref.split('/').last() {
1297 let workflow_name = item_segment
1298 .rsplit_once('.')
1299 .map(|(name, _kind)| name)
1300 .unwrap_or(item_segment);
1301 let config = state.config.lock();
1302 if let Err(e) = crate::cron::remove_workflow_cron_jobs(&config, workflow_name) {
1303 tracing::warn!("Failed to remove cron jobs for deprecated workflow: {e}");
1304 }
1305 }
1306 } else if let Some(ref rev) = rev {
1307 if let Some(definition) = rev.metadata.get("definition") {
1309 sync_cron_for_workflow(&state, &item.item_name, definition);
1310 }
1311 }
1312
1313 let workflow = to_workflow_response(&item, rev.as_ref());
1314 Json(serde_json::json!({ "workflow": workflow })).into_response()
1315 }
1316 Err(e) => kumiho_err(e).into_response(),
1317 }
1318}
1319
1320pub async fn handle_delete_workflow(
1322 State(state): State<AppState>,
1323 headers: HeaderMap,
1324 Path(kref): Path<String>,
1325) -> impl IntoResponse {
1326 if let Err(e) = require_auth(&state, &headers) {
1327 return e.into_response();
1328 }
1329
1330 let kref = format!("kref://{kref}");
1331 let client = build_kumiho_client(&state);
1332
1333 match client.delete_item(&kref).await {
1334 Ok(()) => {
1335 invalidate_cache();
1336
1337 if let Some(item_segment) = kref.split('/').last() {
1341 let workflow_name = item_segment
1342 .rsplit_once('.')
1343 .map(|(name, _kind)| name)
1344 .unwrap_or(item_segment);
1345 let config = state.config.lock();
1346 if let Err(e) = crate::cron::remove_workflow_cron_jobs(&config, workflow_name) {
1347 tracing::warn!("Failed to remove cron jobs for deleted workflow: {e}");
1348 }
1349 }
1350
1351 StatusCode::NO_CONTENT.into_response()
1352 }
1353 Err(e) => kumiho_err(e).into_response(),
1354 }
1355}
1356
1357pub async fn handle_run_workflow(
1362 State(state): State<AppState>,
1363 headers: HeaderMap,
1364 Path(name): Path<String>,
1365 body: Option<Json<RunWorkflowBody>>,
1366) -> impl IntoResponse {
1367 if let Err(e) = require_auth(&state, &headers) {
1368 return e.into_response();
1369 }
1370
1371 let inputs = body
1372 .as_ref()
1373 .map(|b| b.inputs.clone())
1374 .unwrap_or(serde_json::Value::Object(Default::default()));
1375 let cwd = body.as_ref().and_then(|b| b.cwd.clone());
1376
1377 let mut v_args = serde_json::Map::new();
1381 v_args.insert(
1382 "workflow".to_string(),
1383 serde_json::Value::String(name.clone()),
1384 );
1385 if let Some(ref c) = cwd {
1386 v_args.insert("cwd".to_string(), serde_json::Value::String(c.clone()));
1387 }
1388 match validate_via_operator(&state, v_args).await {
1389 Ok(outcome) if !outcome.valid => {
1390 return validation_error_response(&outcome, "cannot dispatch invalid workflow")
1391 .into_response();
1392 }
1393 Ok(_) => {}
1394 Err(e) => {
1395 tracing::warn!("run_workflow: validation skipped (infra error): {e}");
1396 }
1397 }
1398
1399 let run_id = uuid::Uuid::new_v4().to_string();
1400 let now = chrono::Utc::now().to_rfc3339();
1401
1402 let client = build_kumiho_client(&state);
1403 let project = workflow_project(&state);
1404 let requests_space_path = workflow_run_requests_space_path(&state);
1405
1406 let _ = client.ensure_project(&project).await;
1408 let _ = client
1409 .ensure_space(&project, WORKFLOW_RUN_REQUESTS_SPACE_NAME)
1410 .await;
1411
1412 let mut metadata = HashMap::new();
1413 metadata.insert("workflow_name".to_string(), name.clone());
1414 metadata.insert("run_id".to_string(), run_id.clone());
1415 metadata.insert("inputs".to_string(), inputs.to_string());
1416 metadata.insert("cwd".to_string(), cwd.unwrap_or_default());
1417 metadata.insert("trigger_source".to_string(), "api".to_string());
1418 metadata.insert("requested_at".to_string(), now);
1419
1420 let item_name = format!("run-{}", &run_id[..run_id.len().min(12)]);
1421
1422 match client
1423 .create_item(
1424 &requests_space_path,
1425 &item_name,
1426 "workflow-run-request",
1427 metadata.clone(),
1428 )
1429 .await
1430 {
1431 Ok(item) => {
1432 if let Ok(rev) = client.create_revision(&item.kref, metadata).await {
1433 let _ = client.tag_revision(&rev.kref, "pending").await;
1434 }
1435 (
1436 StatusCode::OK,
1437 Json(serde_json::json!({
1438 "run_id": run_id,
1439 "workflow": name,
1440 "status": "pending",
1441 })),
1442 )
1443 .into_response()
1444 }
1445 Err(e) => {
1446 tracing::warn!("Failed to create workflow run request: {e}");
1447 (
1448 StatusCode::INTERNAL_SERVER_ERROR,
1449 Json(serde_json::json!({
1450 "error": format!("Failed to create run request: {e}")
1451 })),
1452 )
1453 .into_response()
1454 }
1455 }
1456}
1457
1458pub async fn handle_get_workflow_by_revision(
1465 State(state): State<AppState>,
1466 headers: HeaderMap,
1467 Path(kref): Path<String>,
1468) -> impl IntoResponse {
1469 if let Err(e) = require_auth(&state, &headers) {
1470 return e.into_response();
1471 }
1472
1473 let revision_kref = if kref.starts_with("kref://") {
1474 kref.clone()
1475 } else {
1476 format!("kref://{kref}")
1477 };
1478
1479 let client = build_kumiho_client(&state);
1480
1481 let mut rev = match client.get_revision(&revision_kref).await {
1482 Ok(r) => r,
1483 Err(e) => return kumiho_err(e).into_response(),
1484 };
1485
1486 if let Ok(artifact) = client
1492 .get_artifact_by_name(&rev.kref, "workflow.yaml")
1493 .await
1494 {
1495 let path = artifact
1496 .location
1497 .strip_prefix("file://")
1498 .unwrap_or(&artifact.location);
1499 if let Ok(yaml) = tokio::fs::read_to_string(path).await {
1500 rev.metadata.insert("definition".to_string(), yaml);
1501 }
1502 }
1503
1504 let item_name = rev
1507 .item_kref
1508 .rsplit('/')
1509 .next()
1510 .map(|seg| {
1511 seg.rsplit_once('.')
1512 .map(|(n, _)| n)
1513 .unwrap_or(seg)
1514 .to_string()
1515 })
1516 .unwrap_or_default();
1517
1518 let item = ItemResponse {
1519 kref: rev.item_kref.clone(),
1520 name: item_name.clone(),
1521 item_name,
1522 kind: "workflow".to_string(),
1523 deprecated: false,
1524 created_at: rev.created_at.clone(),
1525 metadata: HashMap::new(),
1526 };
1527
1528 let workflow = to_workflow_response(&item, Some(&rev));
1529 Json(serde_json::json!({ "workflow": workflow })).into_response()
1530}
1531
1532pub async fn handle_list_workflow_runs(
1536 State(state): State<AppState>,
1537 headers: HeaderMap,
1538 Query(query): Query<WorkflowRunsQuery>,
1539) -> impl IntoResponse {
1540 if let Err(e) = require_auth(&state, &headers) {
1541 return e.into_response();
1542 }
1543
1544 let client = build_kumiho_client(&state);
1545 let project = workflow_project(&state);
1546 let runs_space = workflow_runs_space_path(&state);
1547
1548 match client.list_items(&runs_space, false).await {
1549 Ok(mut items) => {
1550 items.retain(|i| i.kind == "workflow_run");
1552
1553 if let Some(ref wf_name) = query.workflow {
1554 items.retain(|item| {
1555 item.metadata
1556 .get("workflow_name")
1557 .or_else(|| item.metadata.get("workflow"))
1558 .map(|n| n == wf_name)
1559 .unwrap_or(false)
1560 });
1561 }
1562
1563 items.sort_by(|a, b| {
1564 let a_time = a.created_at.as_deref().unwrap_or("");
1565 let b_time = b.created_at.as_deref().unwrap_or("");
1566 b_time.cmp(a_time)
1567 });
1568 items.truncate(query.limit);
1569
1570 let krefs: Vec<String> = items.iter().map(|i| i.kref.clone()).collect();
1571 let rev_map = client
1572 .batch_get_revisions(&krefs, "latest")
1573 .await
1574 .unwrap_or_default();
1575
1576 let runs: Vec<WorkflowRunSummary> = items
1577 .iter()
1578 .map(|item| to_run_summary(item, rev_map.get(&item.kref)))
1579 .collect();
1580
1581 Json(serde_json::json!({ "runs": runs, "count": runs.len() })).into_response()
1582 }
1583 Err(ref e) if matches!(e, KumihoError::Api { status: 404, .. }) => {
1584 let _ = client.ensure_project(&project).await;
1585 let _ = client
1586 .ensure_space(&project, WORKFLOW_RUNS_SPACE_NAME)
1587 .await;
1588 Json(serde_json::json!({ "runs": [], "count": 0 })).into_response()
1589 }
1590 Err(e) => {
1591 let msg = format!("Failed to fetch workflow runs: {e}");
1592 (
1593 StatusCode::SERVICE_UNAVAILABLE,
1594 Json(serde_json::json!({ "error": msg })),
1595 )
1596 .into_response()
1597 }
1598 }
1599}
1600
1601pub async fn handle_get_workflow_run(
1603 State(state): State<AppState>,
1604 headers: HeaderMap,
1605 Path(run_id): Path<String>,
1606) -> impl IntoResponse {
1607 if let Err(e) = require_auth(&state, &headers) {
1608 return e.into_response();
1609 }
1610
1611 let client = build_kumiho_client(&state);
1612 let project = workflow_project(&state);
1613 let runs_space = workflow_runs_space_path(&state);
1614
1615 let run_id_prefix = &run_id[..run_id.len().min(12)];
1618
1619 if let Ok(items) = client
1623 .list_items_filtered(&runs_space, run_id_prefix, false)
1624 .await
1625 {
1626 let run_id_lower = run_id.to_lowercase();
1628 let prefix_lower = run_id_lower[..run_id_lower.len().min(12)].to_string();
1629 if let Some(item) = items.iter().find(|i| {
1630 i.kind == "workflow_run" && i.item_name.to_lowercase().contains(&prefix_lower)
1631 }) {
1632 let rev = client.get_latest_revision(&item.kref).await.ok();
1633 let detail = to_run_detail(item, rev.as_ref());
1634 return Json(serde_json::json!({ "run": detail })).into_response();
1635 }
1636 }
1637
1638 if let Ok(results) = client
1641 .search_items(&run_id, &project, "workflow_run", false)
1642 .await
1643 {
1644 if let Some(sr) = results.first() {
1645 let rev = client.get_latest_revision(&sr.item.kref).await.ok();
1646 let detail = to_run_detail(&sr.item, rev.as_ref());
1647 return Json(serde_json::json!({ "run": detail })).into_response();
1648 }
1649 }
1650
1651 match client.list_items(&runs_space, false).await {
1653 Ok(items) => {
1654 let run_id_lower = run_id.to_lowercase();
1655 let found = items.iter().find(|item| {
1656 if item.kind != "workflow_run" {
1657 return false;
1658 }
1659 if let Some(meta_run_id) = item.metadata.get("run_id") {
1661 if meta_run_id == &run_id {
1662 return true;
1663 }
1664 }
1665 let prefix = &run_id_lower[..run_id_lower.len().min(12)];
1667 item.item_name.to_lowercase().contains(prefix)
1668 });
1669
1670 match found {
1671 Some(item) => {
1672 let rev = client.get_latest_revision(&item.kref).await.ok();
1673 let detail = to_run_detail(item, rev.as_ref());
1674 Json(serde_json::json!({ "run": detail })).into_response()
1675 }
1676 None => (
1677 StatusCode::NOT_FOUND,
1678 Json(serde_json::json!({ "error": format!("Run '{run_id}' not found") })),
1679 )
1680 .into_response(),
1681 }
1682 }
1683 Err(e) => {
1684 let msg = format!("Kumiho error looking up run '{run_id}': {e}");
1685 tracing::warn!("{msg}");
1686 (
1687 StatusCode::SERVICE_UNAVAILABLE,
1688 Json(serde_json::json!({ "error": msg })),
1689 )
1690 .into_response()
1691 }
1692 }
1693}
1694
1695pub async fn handle_delete_workflow_run(
1701 State(state): State<AppState>,
1702 headers: HeaderMap,
1703 Path(run_id): Path<String>,
1704) -> impl IntoResponse {
1705 if let Err(e) = require_auth(&state, &headers) {
1706 return e.into_response();
1707 }
1708
1709 let client = build_kumiho_client(&state);
1710 let runs_space = workflow_runs_space_path(&state);
1711
1712 let run_id_prefix = &run_id[..run_id.len().min(12)];
1714
1715 let kref = if let Ok(items) = client
1716 .list_items_filtered(&runs_space, run_id_prefix, false)
1717 .await
1718 {
1719 let run_id_lower = run_id.to_lowercase();
1720 let prefix_lower = run_id_lower[..run_id_lower.len().min(12)].to_string();
1721 items
1722 .iter()
1723 .find(|i| {
1724 i.kind == "workflow_run" && i.item_name.to_lowercase().contains(&prefix_lower)
1725 })
1726 .map(|i| i.kref.clone())
1727 } else {
1728 None
1729 };
1730
1731 let kref = match kref {
1732 Some(k) => k,
1733 None => {
1734 return (
1735 StatusCode::NOT_FOUND,
1736 Json(serde_json::json!({ "error": format!("Run '{run_id}' not found") })),
1737 )
1738 .into_response();
1739 }
1740 };
1741
1742 match client.delete_item(&kref).await {
1743 Ok(()) => StatusCode::NO_CONTENT.into_response(),
1744 Err(e) => {
1745 let msg = format!("Failed to delete run '{run_id}': {e}");
1746 tracing::warn!("{msg}");
1747 kumiho_err(e).into_response()
1748 }
1749 }
1750}
1751
1752pub async fn handle_approve_workflow_run(
1759 State(state): State<AppState>,
1760 headers: HeaderMap,
1761 Path(run_id): Path<String>,
1762 Json(body): Json<ApproveWorkflowBody>,
1763) -> impl IntoResponse {
1764 if let Err(e) = require_auth(&state, &headers) {
1765 return e.into_response();
1766 }
1767
1768 let approved = body.approved;
1769 let feedback = body.feedback.unwrap_or_default();
1770
1771 let claimed = state.approval_registry.try_claim(&run_id);
1775 let cwd = claimed
1776 .as_ref()
1777 .map(|a| a.cwd.clone())
1778 .unwrap_or_else(|| "/tmp".to_string());
1779
1780 if claimed.is_none() {
1781 tracing::info!(
1782 "approve_workflow_run: no registry entry for run_id={run_id} (gateway restart?), \
1783 calling resume_workflow directly"
1784 );
1785 }
1786
1787 let tool_name = format!(
1789 "{}__resume_workflow",
1790 crate::agent::operator::OPERATOR_SERVER_NAME
1791 );
1792 let mut tool_args = serde_json::Map::new();
1793 tool_args.insert(
1794 "run_id".to_string(),
1795 serde_json::Value::String(run_id.clone()),
1796 );
1797 tool_args.insert("approved".to_string(), serde_json::Value::Bool(approved));
1798 tool_args.insert(
1799 "response".to_string(),
1800 serde_json::Value::String(feedback.clone()),
1801 );
1802 tool_args.insert("cwd".to_string(), serde_json::Value::String(cwd));
1803
1804 let mcp_result = if let Some(ref registry) = state.mcp_registry {
1805 let mcp_future = registry.call_tool(&tool_name, serde_json::Value::Object(tool_args));
1806 match tokio::time::timeout(std::time::Duration::from_secs(30), mcp_future).await {
1807 Ok(Ok(result_str)) => Ok(result_str),
1808 Ok(Err(e)) => Err(format!("operator tool call failed: {e:#}")),
1809 Err(_) => Err("operator tool call timed out (30s)".to_string()),
1810 }
1811 } else {
1812 Err("MCP registry not available — operator not connected".to_string())
1813 };
1814
1815 match mcp_result {
1816 Ok(_) => {
1817 let _ = state.event_tx.send(serde_json::json!({
1820 "type": "human_approval_resolved",
1821 "run_id": run_id,
1822 "approved": approved,
1823 "timestamp": chrono::Utc::now().to_rfc3339(),
1824 }));
1825
1826 (
1827 StatusCode::OK,
1828 Json(serde_json::json!({
1829 "status": "ok",
1830 "message": if approved { "Workflow approved" } else { "Workflow rejected" },
1831 "run_id": run_id,
1832 "approved": approved,
1833 })),
1834 )
1835 .into_response()
1836 }
1837 Err(e) => {
1838 tracing::warn!("approve_workflow_run: failed for run_id={run_id}: {e}");
1839 (
1840 StatusCode::BAD_GATEWAY,
1841 Json(serde_json::json!({
1842 "error": format!("Failed to resume workflow: {e}")
1843 })),
1844 )
1845 .into_response()
1846 }
1847 }
1848}
1849
1850#[derive(Deserialize)]
1851pub struct ApproveWorkflowBody {
1852 pub approved: bool,
1853 pub feedback: Option<String>,
1854}
1855
1856pub async fn handle_retry_workflow_run(
1863 State(state): State<AppState>,
1864 headers: HeaderMap,
1865 Path(run_id): Path<String>,
1866 body: Option<Json<RetryWorkflowBody>>,
1867) -> impl IntoResponse {
1868 if let Err(e) = require_auth(&state, &headers) {
1869 return e.into_response();
1870 }
1871
1872 let cwd = body
1873 .and_then(|Json(b)| b.cwd)
1874 .unwrap_or_else(|| "/tmp".to_string());
1875
1876 let tool_name = format!(
1877 "{}__retry_workflow",
1878 crate::agent::operator::OPERATOR_SERVER_NAME
1879 );
1880 let mut tool_args = serde_json::Map::new();
1881 tool_args.insert(
1882 "run_id".to_string(),
1883 serde_json::Value::String(run_id.clone()),
1884 );
1885 tool_args.insert("cwd".to_string(), serde_json::Value::String(cwd));
1886
1887 let mcp_result = if let Some(ref registry) = state.mcp_registry {
1888 let mcp_future = registry.call_tool(&tool_name, serde_json::Value::Object(tool_args));
1889 match tokio::time::timeout(std::time::Duration::from_secs(30), mcp_future).await {
1890 Ok(Ok(result_str)) => Ok(result_str),
1891 Ok(Err(e)) => Err(format!("operator tool call failed: {e:#}")),
1892 Err(_) => Err("operator tool call timed out (30s)".to_string()),
1893 }
1894 } else {
1895 Err("MCP registry not available — operator not connected".to_string())
1896 };
1897
1898 match mcp_result {
1899 Ok(result_str) => {
1900 let _ = state.event_tx.send(serde_json::json!({
1901 "type": "workflow_retry",
1902 "run_id": run_id,
1903 "timestamp": chrono::Utc::now().to_rfc3339(),
1904 }));
1905 let payload = serde_json::from_str::<serde_json::Value>(&result_str)
1906 .unwrap_or_else(|_| serde_json::json!({"raw": result_str}));
1907 (StatusCode::OK, Json(payload)).into_response()
1908 }
1909 Err(e) => {
1910 tracing::warn!("retry_workflow_run: failed for run_id={run_id}: {e}");
1911 (
1912 StatusCode::BAD_GATEWAY,
1913 Json(serde_json::json!({ "error": format!("Failed to retry workflow: {e}") })),
1914 )
1915 .into_response()
1916 }
1917 }
1918}
1919
1920#[derive(Deserialize)]
1921pub struct RetryWorkflowBody {
1922 pub cwd: Option<String>,
1923}
1924
1925pub async fn handle_agent_activity(
1931 State(state): State<AppState>,
1932 headers: HeaderMap,
1933 Path(agent_id): Path<String>,
1934 Query(query): Query<AgentActivityQuery>,
1935) -> impl IntoResponse {
1936 if let Err(e) = require_auth(&state, &headers) {
1937 return e.into_response();
1938 }
1939
1940 let runlogs_dir =
1941 std::path::PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/tmp".into()))
1942 .join(".construct/operator_mcp/runlogs");
1943 let path = runlogs_dir.join(format!("{agent_id}.jsonl"));
1944
1945 if !path.exists() {
1946 return (
1947 StatusCode::NOT_FOUND,
1948 Json(serde_json::json!({ "error": "No run log found for this agent" })),
1949 )
1950 .into_response();
1951 }
1952
1953 let content = match std::fs::read_to_string(&path) {
1954 Ok(c) => c,
1955 Err(e) => {
1956 return (
1957 StatusCode::INTERNAL_SERVER_ERROR,
1958 Json(serde_json::json!({ "error": format!("Failed to read log: {e}") })),
1959 )
1960 .into_response();
1961 }
1962 };
1963
1964 let view = query.view.as_deref().unwrap_or("summary");
1965 let limit = query.limit.unwrap_or(100).min(500) as usize;
1966
1967 let entries: Vec<serde_json::Value> = content
1968 .lines()
1969 .filter_map(|line| serde_json::from_str(line).ok())
1970 .collect();
1971
1972 match view {
1973 "tool_calls" => {
1974 let tools: Vec<&serde_json::Value> = entries
1976 .iter()
1977 .filter(|e| e.get("kind").and_then(|v| v.as_str()) == Some("tool_call"))
1978 .collect();
1979 let total = tools.len();
1980 let slice: Vec<_> = tools.into_iter().rev().take(limit).collect();
1981 Json(serde_json::json!({
1982 "agent_id": agent_id,
1983 "view": "tool_calls",
1984 "total": total,
1985 "entries": slice,
1986 }))
1987 .into_response()
1988 }
1989 "messages" => {
1990 let msgs: Vec<&serde_json::Value> = entries
1992 .iter()
1993 .filter(|e| {
1994 let kind = e.get("kind").and_then(|v| v.as_str()).unwrap_or("");
1995 kind == "message" || kind == "user_message"
1996 })
1997 .collect();
1998 let total = msgs.len();
1999 let slice: Vec<_> = msgs.into_iter().rev().take(limit).collect();
2000 Json(serde_json::json!({
2001 "agent_id": agent_id,
2002 "view": "messages",
2003 "total": total,
2004 "entries": slice,
2005 }))
2006 .into_response()
2007 }
2008 "errors" => {
2009 let errs: Vec<&serde_json::Value> = entries
2010 .iter()
2011 .filter(|e| {
2012 let kind = e.get("kind").and_then(|v| v.as_str()).unwrap_or("");
2013 kind == "error"
2014 || kind == "turn_failed"
2015 || e.get("status").and_then(|v| v.as_str()) == Some("failed")
2016 })
2017 .collect();
2018 Json(serde_json::json!({
2019 "agent_id": agent_id,
2020 "view": "errors",
2021 "total": errs.len(),
2022 "entries": errs,
2023 }))
2024 .into_response()
2025 }
2026 "full" => {
2027 let total = entries.len();
2029 let slice: Vec<_> = entries.into_iter().rev().take(limit).collect();
2030 Json(serde_json::json!({
2031 "agent_id": agent_id,
2032 "view": "full",
2033 "total": total,
2034 "entries": slice,
2035 }))
2036 .into_response()
2037 }
2038 _ => {
2039 let header = entries.first().cloned().unwrap_or_default();
2041 let tool_count = entries
2042 .iter()
2043 .filter(|e| e.get("kind").and_then(|v| v.as_str()) == Some("tool_call"))
2044 .count();
2045 let error_count = entries
2046 .iter()
2047 .filter(|e| {
2048 let kind = e.get("kind").and_then(|v| v.as_str()).unwrap_or("");
2049 kind == "error" || kind == "turn_failed"
2050 })
2051 .count();
2052 let last_message = entries
2053 .iter()
2054 .rev()
2055 .find(|e| e.get("kind").and_then(|v| v.as_str()) == Some("message"))
2056 .and_then(|e| e.get("text").and_then(|v| v.as_str()))
2057 .unwrap_or("");
2058 let last_msg_truncated = if last_message.len() > 5000 {
2060 &last_message[..5000]
2061 } else {
2062 last_message
2063 };
2064 let recent_tools: Vec<_> = entries
2066 .iter()
2067 .filter(|e| e.get("kind").and_then(|v| v.as_str()) == Some("tool_call"))
2068 .rev()
2069 .take(20)
2070 .cloned()
2071 .collect();
2072 let mut input_tokens: u64 = 0;
2074 let mut output_tokens: u64 = 0;
2075 let mut total_cost: f64 = 0.0;
2076 for e in &entries {
2077 if e.get("kind").and_then(|v| v.as_str()) == Some("turn_completed") {
2078 if let Some(usage) = e.get("usage") {
2079 input_tokens += usage
2080 .get("inputTokens")
2081 .and_then(|v| v.as_u64())
2082 .unwrap_or(0);
2083 output_tokens += usage
2084 .get("outputTokens")
2085 .and_then(|v| v.as_u64())
2086 .unwrap_or(0);
2087 total_cost += usage
2088 .get("totalCostUsd")
2089 .and_then(|v| v.as_f64())
2090 .unwrap_or(0.0);
2091 }
2092 }
2093 }
2094 Json(serde_json::json!({
2095 "agent_id": agent_id,
2096 "view": "summary",
2097 "title": header.get("title").and_then(|v| v.as_str()).unwrap_or(""),
2098 "agent_type": header.get("agent_type").and_then(|v| v.as_str()).unwrap_or(""),
2099 "total_events": entries.len(),
2100 "tool_call_count": tool_count,
2101 "error_count": error_count,
2102 "last_message": last_msg_truncated,
2103 "recent_tools": recent_tools,
2104 "usage": {
2105 "input_tokens": input_tokens,
2106 "output_tokens": output_tokens,
2107 "total_cost_usd": total_cost,
2108 },
2109 }))
2110 .into_response()
2111 }
2112 }
2113}
2114
2115#[derive(Deserialize)]
2116pub struct AgentActivityQuery {
2117 view: Option<String>,
2118 limit: Option<u32>,
2119}
2120
2121pub async fn handle_workflow_dashboard(
2123 State(state): State<AppState>,
2124 headers: HeaderMap,
2125) -> impl IntoResponse {
2126 if let Err(e) = require_auth(&state, &headers) {
2127 return e.into_response();
2128 }
2129
2130 let client = build_kumiho_client(&state);
2131 let space_path = workflow_space_path(&state);
2132 let runs_space = workflow_runs_space_path(&state);
2133
2134 let definitions = match client.list_items(&space_path, false).await {
2136 Ok(items) => merge_with_builtins(enrich_items(&client, items).await),
2137 Err(_) => merge_with_builtins(Vec::new()),
2138 };
2139 let definitions_count = definitions.len();
2140
2141 let (recent_runs, total_runs) = match client.list_items(&runs_space, false).await {
2143 Ok(mut items) => {
2144 let total = items.len();
2145 items.sort_by(|a, b| {
2146 let a_time = a.created_at.as_deref().unwrap_or("");
2147 let b_time = b.created_at.as_deref().unwrap_or("");
2148 b_time.cmp(a_time)
2149 });
2150 items.truncate(5);
2151
2152 let krefs: Vec<String> = items.iter().map(|i| i.kref.clone()).collect();
2153 let rev_map = client
2154 .batch_get_revisions(&krefs, "latest")
2155 .await
2156 .unwrap_or_default();
2157
2158 let runs: Vec<WorkflowRunSummary> = items
2159 .iter()
2160 .map(|item| to_run_summary(item, rev_map.get(&item.kref)))
2161 .collect();
2162
2163 (runs, total)
2164 }
2165 Err(_) => (Vec::new(), 0),
2166 };
2167
2168 let active_runs = recent_runs
2169 .iter()
2170 .filter(|r| r.status == "running" || r.status == "paused")
2171 .count();
2172
2173 let dashboard = WorkflowDashboard {
2174 definitions_count,
2175 definitions,
2176 active_runs,
2177 recent_runs,
2178 total_runs,
2179 };
2180
2181 Json(serde_json::json!({ "dashboard": dashboard })).into_response()
2182}