1use super::AppState;
18use super::api::require_auth;
19use super::api_agents::build_kumiho_client;
20use super::kumiho_client::{ItemResponse, KumihoClient, KumihoError, RevisionResponse, slugify};
21use axum::{
22 extract::{Path, Query, State},
23 http::{HeaderMap, StatusCode},
24 response::{IntoResponse, Json},
25};
26use parking_lot::Mutex;
27use serde::{Deserialize, Serialize};
28use std::collections::HashMap;
29use std::sync::OnceLock;
30use std::time::Instant;
31
32const WORKFLOW_SPACE_NAME: &str = "Workflows";
33const WORKFLOW_RUNS_SPACE_NAME: &str = "WorkflowRuns";
34const WORKFLOW_RUN_REQUESTS_SPACE_NAME: &str = "WorkflowRunRequests";
35
36fn workflow_project(state: &AppState) -> String {
37 state.config.lock().kumiho.harness_project.clone()
38}
39
40fn workflow_space_path(state: &AppState) -> String {
41 format!("/{}/{}", workflow_project(state), WORKFLOW_SPACE_NAME)
42}
43
44fn workflow_runs_space_path(state: &AppState) -> String {
45 format!("/{}/{}", workflow_project(state), WORKFLOW_RUNS_SPACE_NAME)
46}
47
48fn workflow_run_requests_space_path(state: &AppState) -> String {
49 format!(
50 "/{}/{}",
51 workflow_project(state),
52 WORKFLOW_RUN_REQUESTS_SPACE_NAME
53 )
54}
55
56struct WorkflowCache {
59 workflows: Vec<WorkflowResponse>,
60 include_deprecated: bool,
61 fetched_at: Instant,
62}
63
64static WORKFLOW_CACHE: OnceLock<Mutex<Option<WorkflowCache>>> = OnceLock::new();
65const CACHE_TTL_SECS: u64 = 3;
66
67fn get_cached(include_deprecated: bool) -> Option<Vec<WorkflowResponse>> {
68 let lock = WORKFLOW_CACHE.get_or_init(|| Mutex::new(None));
69 let cache = lock.lock();
70 if let Some(ref c) = *cache {
71 if c.include_deprecated == include_deprecated
72 && c.fetched_at.elapsed().as_secs() < CACHE_TTL_SECS
73 {
74 return Some(c.workflows.clone());
75 }
76 }
77 None
78}
79
80fn set_cached(workflows: &[WorkflowResponse], include_deprecated: bool) {
81 let lock = WORKFLOW_CACHE.get_or_init(|| Mutex::new(None));
82 let mut cache = lock.lock();
83 *cache = Some(WorkflowCache {
84 workflows: workflows.to_vec(),
85 include_deprecated,
86 fetched_at: Instant::now(),
87 });
88}
89
90fn invalidate_cache() {
91 if let Some(lock) = WORKFLOW_CACHE.get() {
92 let mut cache = lock.lock();
93 if let Some(ref mut c) = *cache {
95 c.fetched_at = Instant::now() - std::time::Duration::from_secs(CACHE_TTL_SECS + 1);
96 }
97 }
98}
99
100#[derive(Deserialize)]
103pub struct WorkflowListQuery {
104 #[serde(default)]
105 pub include_deprecated: bool,
106 pub q: Option<String>,
107}
108
109#[derive(Deserialize)]
110pub struct CreateWorkflowBody {
111 pub name: String,
112 pub description: String,
113 pub definition: String,
114 #[serde(default)]
115 pub version: Option<String>,
116 #[serde(default)]
117 pub tags: Option<Vec<String>>,
118}
119
120#[derive(Deserialize)]
121pub struct DeprecateBody {
122 pub kref: String,
123 pub deprecated: bool,
124}
125
126#[derive(Deserialize)]
127pub struct WorkflowRunsQuery {
128 #[serde(default = "default_limit")]
129 pub limit: usize,
130 #[serde(default)]
131 pub workflow: Option<String>,
132}
133
134fn default_limit() -> usize {
135 20
136}
137
138#[derive(Deserialize)]
139pub struct RunWorkflowBody {
140 #[serde(default)]
141 pub inputs: serde_json::Value,
142 #[serde(default)]
143 pub cwd: Option<String>,
144}
145
146#[derive(Serialize, Clone)]
149pub struct WorkflowResponse {
150 pub kref: String,
151 pub name: String,
152 pub item_name: String,
153 pub deprecated: bool,
154 pub created_at: Option<String>,
155 pub description: String,
156 pub definition: String,
157 pub version: String,
158 pub tags: Vec<String>,
159 pub steps: usize,
160 pub revision_number: i32,
161 #[serde(default = "default_source")]
165 pub source: String,
166 #[serde(skip_serializing_if = "Vec::is_empty")]
167 pub triggers: Vec<WorkflowTrigger>,
168}
169
170fn default_source() -> String {
171 "custom".to_string()
172}
173
174#[derive(Serialize, Clone, Debug)]
175pub struct WorkflowTrigger {
176 pub on_kind: String,
177 pub on_tag: String,
178 #[serde(skip_serializing_if = "String::is_empty")]
179 pub on_name_pattern: String,
180}
181
182#[derive(Serialize, Clone)]
183pub struct WorkflowRunSummary {
184 pub kref: String,
185 pub run_id: String,
186 pub workflow_name: String,
187 pub status: String,
188 pub started_at: String,
189 pub completed_at: String,
190 pub steps_completed: String,
191 pub steps_total: String,
192 pub error: String,
193 #[serde(default, skip_serializing_if = "String::is_empty")]
196 pub workflow_item_kref: String,
197 #[serde(default, skip_serializing_if = "String::is_empty")]
201 pub workflow_revision_kref: String,
202}
203
204#[derive(Serialize, Clone)]
205pub struct TranscriptEntry {
206 pub speaker: String,
207 pub content: String,
208 pub round: u32,
209}
210
211#[derive(Serialize, Clone, Default)]
212pub struct ApprovalOutputData {
213 #[serde(skip_serializing_if = "Option::is_none")]
214 pub awaiting_approval: Option<bool>,
215 #[serde(skip_serializing_if = "Option::is_none")]
216 pub approval_message: Option<String>,
217 #[serde(skip_serializing_if = "Vec::is_empty")]
218 pub approve_keywords: Vec<String>,
219 #[serde(skip_serializing_if = "Vec::is_empty")]
220 pub reject_keywords: Vec<String>,
221}
222
223#[derive(Serialize, Clone)]
224pub struct WorkflowStepDetail {
225 pub step_id: String,
226 pub status: String,
227 #[serde(skip_serializing_if = "String::is_empty")]
228 pub agent_id: String,
229 #[serde(skip_serializing_if = "String::is_empty")]
230 pub agent_type: String,
231 #[serde(skip_serializing_if = "String::is_empty")]
232 pub role: String,
233 #[serde(skip_serializing_if = "String::is_empty")]
234 pub template_name: String,
235 #[serde(skip_serializing_if = "String::is_empty")]
236 pub output_preview: String,
237 #[serde(skip_serializing_if = "Vec::is_empty")]
238 pub skills: Vec<String>,
239 #[serde(skip_serializing_if = "Vec::is_empty")]
240 pub transcript: Vec<TranscriptEntry>,
241 #[serde(skip_serializing_if = "Option::is_none")]
242 pub output_data: Option<ApprovalOutputData>,
243}
244
245#[derive(Serialize, Clone)]
246pub struct WorkflowRunDetail {
247 #[serde(flatten)]
248 pub summary: WorkflowRunSummary,
249 pub steps: Vec<WorkflowStepDetail>,
250}
251
252#[derive(Serialize)]
253pub struct WorkflowDashboard {
254 pub definitions_count: usize,
255 pub definitions: Vec<WorkflowResponse>,
256 pub active_runs: usize,
257 pub recent_runs: Vec<WorkflowRunSummary>,
258 pub total_runs: usize,
259}
260
261fn kumiho_err(e: KumihoError) -> (StatusCode, Json<serde_json::Value>) {
264 match &e {
265 KumihoError::Unreachable(_) => (
266 StatusCode::SERVICE_UNAVAILABLE,
267 Json(serde_json::json!({ "error": format!("Kumiho service unavailable: {e}") })),
268 ),
269 KumihoError::Api { status, body } => {
270 let code = if *status == 401 || *status == 403 {
271 StatusCode::BAD_GATEWAY
272 } else {
273 StatusCode::from_u16(*status).unwrap_or(StatusCode::BAD_GATEWAY)
274 };
275 (
276 code,
277 Json(serde_json::json!({ "error": format!("Kumiho upstream: {body}") })),
278 )
279 }
280 KumihoError::Decode(msg) => (
281 StatusCode::BAD_GATEWAY,
282 Json(serde_json::json!({ "error": format!("Bad response from Kumiho: {msg}") })),
283 ),
284 }
285}
286
287fn workflow_metadata(body: &CreateWorkflowBody) -> HashMap<String, String> {
288 let mut meta = HashMap::new();
289 meta.insert("display_name".to_string(), body.name.clone());
290 meta.insert("description".to_string(), body.description.clone());
291 meta.insert("definition".to_string(), body.definition.clone());
292 meta.insert("created_by".to_string(), "construct-dashboard".to_string());
293 let steps = count_yaml_steps(&body.definition);
295 meta.insert("steps".to_string(), steps.to_string());
296 if let Some(ref tags) = body.tags {
297 if !tags.is_empty() {
298 meta.insert("tags".to_string(), tags.join(","));
299 }
300 }
301 meta.insert(
303 "_search_text".to_string(),
304 format!("{} {}", body.name, body.description),
305 );
306 meta
307}
308
309fn count_yaml_steps(content: &str) -> usize {
310 let mut count = 0;
311 let mut in_steps = false;
312 for line in content.lines() {
313 let trimmed = line.trim();
314 if trimmed == "steps:" || trimmed == "tasks:" {
315 in_steps = true;
316 continue;
317 }
318 if in_steps {
319 if trimmed.starts_with("- id:") {
320 count += 1;
321 }
322 if !trimmed.is_empty()
323 && !trimmed.starts_with('-')
324 && !trimmed.starts_with(' ')
325 && !trimmed.starts_with('#')
326 && !line.starts_with(' ')
327 {
328 break;
329 }
330 }
331 }
332 count
333}
334
335fn to_workflow_response(item: &ItemResponse, rev: Option<&RevisionResponse>) -> WorkflowResponse {
336 let meta = rev.map(|r| &r.metadata);
337 let get = |key: &str| -> String { meta.and_then(|m| m.get(key)).cloned().unwrap_or_default() };
338 let tags_str = get("tags");
339 let tags: Vec<String> = if tags_str.is_empty() {
340 Vec::new()
341 } else {
342 tags_str.split(',').map(|s| s.trim().to_string()).collect()
343 };
344 let steps: usize = get("steps").parse().unwrap_or(0);
345
346 let display_name = {
347 let n = get("display_name");
348 if n.is_empty() {
349 item.item_name.clone()
350 } else {
351 n
352 }
353 };
354
355 let definition = get("definition");
356 let triggers = extract_triggers(&definition);
357
358 WorkflowResponse {
359 kref: item.kref.clone(),
360 name: display_name,
361 item_name: item.item_name.clone(),
362 deprecated: item.deprecated,
363 created_at: item.created_at.clone(),
364 description: get("description"),
365 definition,
366 version: format!("{}", rev.map(|r| r.number).unwrap_or(0)),
367 tags,
368 steps,
369 revision_number: rev.map(|r| r.number).unwrap_or(0),
370 source: "custom".to_string(),
371 triggers,
372 }
373}
374
375async fn prefer_artifact_definitions(
383 client: &super::kumiho_client::KumihoClient,
384 revs: &mut HashMap<String, RevisionResponse>,
385) {
386 for rev in revs.values_mut() {
387 if let Ok(artifact) = client
388 .get_artifact_by_name(&rev.kref, "workflow.yaml")
389 .await
390 {
391 let path = artifact
392 .location
393 .strip_prefix("file://")
394 .unwrap_or(&artifact.location);
395 if let Ok(yaml) = tokio::fs::read_to_string(path).await {
396 rev.metadata.insert("definition".to_string(), yaml);
397 }
398 }
399 }
400}
401
402async fn enrich_items(
403 client: &super::kumiho_client::KumihoClient,
404 items: Vec<ItemResponse>,
405) -> Vec<WorkflowResponse> {
406 let items: Vec<ItemResponse> = items.into_iter().filter(|i| i.kind == "workflow").collect();
409
410 if items.is_empty() {
411 return Vec::new();
412 }
413
414 let krefs: Vec<String> = items.iter().map(|i| i.kref.clone()).collect();
415
416 if let Ok(mut rev_map) = client.batch_get_revisions(&krefs, "published").await {
417 let missing: Vec<String> = krefs
418 .iter()
419 .filter(|k| !rev_map.contains_key(*k))
420 .cloned()
421 .collect();
422 let mut latest_map = if !missing.is_empty() {
423 client
424 .batch_get_revisions(&missing, "latest")
425 .await
426 .unwrap_or_default()
427 } else {
428 HashMap::new()
429 };
430
431 prefer_artifact_definitions(client, &mut rev_map).await;
437 prefer_artifact_definitions(client, &mut latest_map).await;
438
439 return items
440 .iter()
441 .map(|item| {
442 let rev = rev_map
443 .get(&item.kref)
444 .or_else(|| latest_map.get(&item.kref));
445 to_workflow_response(item, rev)
446 })
447 .collect();
448 }
449
450 let mut workflows = Vec::with_capacity(items.len());
452 for item in &items {
453 let rev = client.get_published_or_latest(&item.kref).await.ok();
454 workflows.push(to_workflow_response(item, rev.as_ref()));
455 }
456 workflows
457}
458
459fn to_run_summary(item: &ItemResponse, rev: Option<&RevisionResponse>) -> WorkflowRunSummary {
460 let meta = rev.map(|r| &r.metadata);
461 let get = |key: &str| -> String { meta.and_then(|m| m.get(key)).cloned().unwrap_or_default() };
462
463 let run_id_meta = get("run_id");
464 WorkflowRunSummary {
465 kref: item.kref.clone(),
466 run_id: if run_id_meta.is_empty() {
467 item.item_name.clone()
468 } else {
469 run_id_meta
470 },
471 workflow_name: {
472 let wn = get("workflow_name");
473 if wn.is_empty() { get("workflow") } else { wn }
474 },
475 status: get("status"),
476 started_at: get("started_at"),
477 completed_at: get("completed_at"),
478 steps_completed: get("steps_completed"),
479 steps_total: get("steps_total"),
480 error: get("error"),
481 workflow_item_kref: get("workflow_item_kref"),
482 workflow_revision_kref: get("workflow_revision_kref"),
483 }
484}
485
486fn extract_steps_from_metadata(meta: &HashMap<String, String>) -> Vec<WorkflowStepDetail> {
487 const SKIP_KEYS: &[&str] = &["step_count", "steps_completed", "steps_total"];
489
490 let mut steps = Vec::new();
491 for (key, value) in meta {
492 if SKIP_KEYS.contains(&key.as_str()) {
493 continue;
494 }
495 if let Some(step_id) = key.strip_prefix("step_") {
496 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(value) {
499 if !parsed.is_object() {
501 continue;
502 }
503 let skills = parsed
504 .get("skills")
505 .and_then(|v| v.as_array())
506 .map(|arr| {
507 arr.iter()
508 .filter_map(|s| s.as_str().map(|s| s.to_string()))
509 .collect::<Vec<_>>()
510 })
511 .unwrap_or_default();
512 let transcript = parsed
514 .get("transcript")
515 .and_then(|v| v.as_str())
516 .and_then(|s| serde_json::from_str::<Vec<serde_json::Value>>(s).ok())
517 .map(|arr| {
518 arr.iter()
519 .map(|entry| TranscriptEntry {
520 speaker: entry
521 .get("speaker")
522 .and_then(|v| v.as_str())
523 .unwrap_or("?")
524 .to_string(),
525 content: entry
526 .get("content")
527 .and_then(|v| v.as_str())
528 .unwrap_or("")
529 .to_string(),
530 round: entry.get("round").and_then(|v| v.as_u64()).unwrap_or(0)
531 as u32,
532 })
533 .collect::<Vec<_>>()
534 })
535 .unwrap_or_default();
536 let output_data = parsed.get("output_data").and_then(|v| {
538 let obj = if let Some(s) = v.as_str() {
540 serde_json::from_str::<serde_json::Value>(s).ok()
541 } else {
542 Some(v.clone())
543 };
544 obj.map(|o| ApprovalOutputData {
545 awaiting_approval: o.get("awaiting_approval").and_then(|v| v.as_bool()),
546 approval_message: o
547 .get("approval_message")
548 .and_then(|v| v.as_str())
549 .map(String::from),
550 approve_keywords: o
551 .get("approve_keywords")
552 .and_then(|v| v.as_array())
553 .map(|arr| {
554 arr.iter()
555 .filter_map(|s| s.as_str().map(String::from))
556 .collect()
557 })
558 .unwrap_or_default(),
559 reject_keywords: o
560 .get("reject_keywords")
561 .and_then(|v| v.as_array())
562 .map(|arr| {
563 arr.iter()
564 .filter_map(|s| s.as_str().map(String::from))
565 .collect()
566 })
567 .unwrap_or_default(),
568 })
569 });
570 steps.push(WorkflowStepDetail {
571 step_id: step_id.to_string(),
572 status: parsed
573 .get("status")
574 .and_then(|v| v.as_str())
575 .unwrap_or("unknown")
576 .to_string(),
577 agent_id: parsed
578 .get("agent_id")
579 .and_then(|v| v.as_str())
580 .unwrap_or("")
581 .to_string(),
582 agent_type: parsed
583 .get("agent_type")
584 .and_then(|v| v.as_str())
585 .unwrap_or("")
586 .to_string(),
587 role: parsed
588 .get("role")
589 .and_then(|v| v.as_str())
590 .unwrap_or("")
591 .to_string(),
592 template_name: parsed
593 .get("template_name")
594 .and_then(|v| v.as_str())
595 .unwrap_or("")
596 .to_string(),
597 output_preview: parsed
598 .get("output_preview")
599 .and_then(|v| v.as_str())
600 .unwrap_or("")
601 .to_string(),
602 skills,
603 transcript,
604 output_data,
605 });
606 } else if value.contains(r#""status""#) {
607 let status = if let Some(start) = value.find(r#""status": ""#) {
609 let rest = &value[start + 11..];
610 rest.split('"').next().unwrap_or("unknown")
611 } else {
612 "unknown"
613 };
614 steps.push(WorkflowStepDetail {
615 step_id: step_id.to_string(),
616 status: status.to_string(),
617 agent_id: String::new(),
618 agent_type: String::new(),
619 role: String::new(),
620 template_name: String::new(),
621 output_preview: String::new(),
622 skills: Vec::new(),
623 transcript: Vec::new(),
624 output_data: None,
625 });
626 }
627 }
628 }
629 steps
630}
631
632fn to_run_detail(item: &ItemResponse, rev: Option<&RevisionResponse>) -> WorkflowRunDetail {
633 let summary = to_run_summary(item, rev);
634 let steps = rev
635 .map(|r| extract_steps_from_metadata(&r.metadata))
636 .unwrap_or_default();
637 WorkflowRunDetail { summary, steps }
638}
639
640const BUILTIN_WORKFLOWS_DIR: &str = ".construct/operator_mcp/workflow/builtins";
644
645fn discover_builtin_workflows() -> Vec<WorkflowResponse> {
649 let home = directories::UserDirs::new()
650 .map(|u| u.home_dir().to_path_buf())
651 .unwrap_or_default();
652 let builtins_dir = home.join(BUILTIN_WORKFLOWS_DIR);
653 let Ok(entries) = std::fs::read_dir(&builtins_dir) else {
654 return Vec::new();
655 };
656
657 let mut workflows = Vec::new();
658 for entry in entries.flatten() {
659 let path = entry.path();
660 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
661 if ext != "yaml" && ext != "yml" {
662 continue;
663 }
664 let Ok(content) = std::fs::read_to_string(&path) else {
665 continue;
666 };
667 let name = extract_yaml_field(&content, "name").unwrap_or_else(|| {
669 path.file_stem()
670 .unwrap_or_default()
671 .to_string_lossy()
672 .into_owned()
673 });
674 let description = extract_yaml_field(&content, "description").unwrap_or_default();
675 let version = extract_yaml_field(&content, "version").unwrap_or_else(|| "1.0".into());
676 let tags_str = extract_yaml_field(&content, "tags").unwrap_or_default();
677 let tags: Vec<String> = if tags_str.is_empty() {
678 Vec::new()
679 } else {
680 tags_str
682 .trim_start_matches('[')
683 .trim_end_matches(']')
684 .split(',')
685 .map(|s| s.trim().trim_matches('"').trim_matches('\'').to_string())
686 .filter(|s| !s.is_empty())
687 .collect()
688 };
689 let steps = count_yaml_steps(&content);
690 let item_name = slugify(&name);
691
692 let triggers = extract_triggers(&content);
693 workflows.push(WorkflowResponse {
694 kref: format!("builtin://{item_name}"),
695 name,
696 item_name,
697 deprecated: false,
698 created_at: None,
699 description,
700 definition: content,
701 version,
702 tags,
703 steps,
704 revision_number: 0,
705 source: "builtin".to_string(),
706 triggers,
707 });
708 }
709 workflows
710}
711
712fn extract_yaml_field(content: &str, field: &str) -> Option<String> {
714 for line in content.lines() {
715 let trimmed = line.trim();
716 if let Some(rest) = trimmed.strip_prefix(field) {
717 if let Some(value) = rest.strip_prefix(':') {
718 let v = value.trim();
719 let v = v.trim_matches('"').trim_matches('\'');
721 if !v.is_empty() {
722 return Some(v.to_string());
723 }
724 }
725 }
726 if trimmed == "steps:" || trimmed == "inputs:" {
728 break;
729 }
730 }
731 None
732}
733
734fn extract_triggers(content: &str) -> Vec<WorkflowTrigger> {
739 let mut triggers = Vec::new();
740 let mut in_triggers = false;
741 let mut current_kind = String::new();
742 let mut current_tag = String::new();
743 let mut current_pattern = String::new();
744
745 for line in content.lines() {
746 let trimmed = line.trim();
747 if trimmed == "triggers:" {
748 in_triggers = true;
749 continue;
750 }
751 if !in_triggers {
752 continue;
753 }
754 if !trimmed.is_empty()
756 && !trimmed.starts_with('-')
757 && !trimmed.starts_with('#')
758 && !line.starts_with(' ')
759 && !line.starts_with('\t')
760 {
761 break;
762 }
763 if trimmed.starts_with("- ") {
765 if !current_kind.is_empty() {
766 triggers.push(WorkflowTrigger {
767 on_kind: std::mem::take(&mut current_kind),
768 on_tag: if current_tag.is_empty() {
769 "ready".to_string()
770 } else {
771 std::mem::take(&mut current_tag)
772 },
773 on_name_pattern: std::mem::take(&mut current_pattern),
774 });
775 }
776 let after_dash = trimmed.strip_prefix("- ").unwrap_or("");
778 if let Some((k, v)) = after_dash.split_once(':') {
779 let k = k.trim();
780 let v = v.trim().trim_matches('"').trim_matches('\'');
781 match k {
782 "on_kind" => current_kind = v.to_string(),
783 "on_tag" => current_tag = v.to_string(),
784 "on_name_pattern" => current_pattern = v.to_string(),
785 _ => {}
786 }
787 }
788 continue;
789 }
790 if let Some((k, v)) = trimmed.split_once(':') {
792 let k = k.trim();
793 let v = v.trim().trim_matches('"').trim_matches('\'');
794 match k {
795 "on_kind" => current_kind = v.to_string(),
796 "on_tag" => current_tag = v.to_string(),
797 "on_name_pattern" => current_pattern = v.to_string(),
798 _ => {}
799 }
800 }
801 }
802 if !current_kind.is_empty() {
804 triggers.push(WorkflowTrigger {
805 on_kind: current_kind,
806 on_tag: if current_tag.is_empty() {
807 "ready".to_string()
808 } else {
809 current_tag
810 },
811 on_name_pattern: current_pattern,
812 });
813 }
814 triggers
815}
816
817fn extract_cron_triggers(content: &str) -> Vec<(String, Option<String>)> {
822 let mut results = Vec::new();
823 let mut in_triggers = false;
824 let mut current_cron = String::new();
825 let mut current_tz: Option<String> = None;
826
827 for line in content.lines() {
828 let trimmed = line.trim();
829 if trimmed == "triggers:" {
830 in_triggers = true;
831 continue;
832 }
833 if !in_triggers {
834 continue;
835 }
836 if !trimmed.is_empty()
838 && !trimmed.starts_with('-')
839 && !trimmed.starts_with('#')
840 && !line.starts_with(' ')
841 && !line.starts_with('\t')
842 {
843 break;
844 }
845 if trimmed.starts_with("- ") {
847 if !current_cron.is_empty() {
848 results.push((std::mem::take(&mut current_cron), current_tz.take()));
849 }
850 let after_dash = trimmed.strip_prefix("- ").unwrap_or("");
852 if let Some((k, v)) = after_dash.split_once(':') {
853 let k = k.trim();
854 let v = v.trim().trim_matches('"').trim_matches('\'');
855 match k {
856 "cron" if !v.is_empty() => current_cron = v.to_string(),
857 "timezone" | "tz" if !v.is_empty() => current_tz = Some(v.to_string()),
858 _ => {}
859 }
860 }
861 continue;
862 }
863 if let Some((k, v)) = trimmed.split_once(':') {
865 let k = k.trim();
866 let v = v.trim().trim_matches('"').trim_matches('\'');
867 match k {
868 "cron" if !v.is_empty() => current_cron = v.to_string(),
869 "timezone" | "tz" if !v.is_empty() => current_tz = Some(v.to_string()),
870 _ => {}
871 }
872 }
873 }
874 if !current_cron.is_empty() {
876 results.push((current_cron, current_tz));
877 }
878 results
879}
880
881async fn persist_workflow_artifact(
887 client: &KumihoClient,
888 revision_kref: &str,
889 revision_number: i32,
890 workflow_name: &str,
891 definition: &str,
892) {
893 let home = directories::UserDirs::new()
894 .map(|u| u.home_dir().to_path_buf())
895 .unwrap_or_default();
896 let dir = home.join(".construct/workflows");
897 let _ = tokio::fs::create_dir_all(&dir).await;
898
899 let slug = slugify(workflow_name);
900 let file_path = dir.join(format!("{slug}.r{revision_number}.yaml"));
901 let location = format!("file://{}", file_path.display());
902
903 if let Err(e) = tokio::fs::write(&file_path, definition).await {
904 tracing::warn!("Failed to write workflow YAML for {workflow_name}: {e}");
905 return;
906 }
907
908 if let Err(e) = client
909 .create_artifact(revision_kref, "workflow.yaml", &location, HashMap::new())
910 .await
911 {
912 tracing::warn!("Failed to create artifact for workflow {workflow_name}: {e}");
913 } else {
914 tracing::info!("Persisted workflow artifact: {location}");
915 }
916}
917
918fn sync_cron_for_workflow(state: &AppState, workflow_name: &str, definition: &str) {
919 let cron_triggers = extract_cron_triggers(definition);
920 let config = state.config.lock();
921
922 if let Err(e) = crate::cron::remove_workflow_cron_jobs(&config, workflow_name) {
924 tracing::warn!("Failed to remove old cron jobs for workflow {workflow_name}: {e}");
925 }
926
927 if cron_triggers.is_empty() {
928 return;
929 }
930
931 let wf_crons: Vec<(String, String, Option<String>)> = cron_triggers
932 .into_iter()
933 .map(|(expr, tz)| (workflow_name.to_string(), expr, tz))
934 .collect();
935
936 if let Err(e) = crate::cron::sync_workflow_cron_jobs(&config, &wf_crons) {
937 tracing::warn!("Failed to sync cron triggers for workflow {workflow_name}: {e}");
938 }
939}
940
941fn merge_with_builtins(mut kumiho_workflows: Vec<WorkflowResponse>) -> Vec<WorkflowResponse> {
947 let builtins = discover_builtin_workflows();
948 if builtins.is_empty() {
949 return kumiho_workflows;
950 }
951
952 let builtin_names: std::collections::HashSet<String> =
953 builtins.iter().map(|b| b.item_name.clone()).collect();
954
955 for wf in &mut kumiho_workflows {
957 if builtin_names.contains(&wf.item_name) {
958 wf.source = "builtin-modified".to_string();
959 }
960 }
961
962 let kumiho_names: std::collections::HashSet<String> = kumiho_workflows
964 .iter()
965 .map(|w| w.item_name.clone())
966 .collect();
967 for builtin in builtins {
968 if !kumiho_names.contains(&builtin.item_name) {
969 kumiho_workflows.push(builtin);
970 }
971 }
972
973 kumiho_workflows
974}
975
976pub async fn handle_list_workflows(
980 State(state): State<AppState>,
981 headers: HeaderMap,
982 Query(query): Query<WorkflowListQuery>,
983) -> impl IntoResponse {
984 if let Err(e) = require_auth(&state, &headers) {
985 return e.into_response();
986 }
987
988 let client = build_kumiho_client(&state);
989 let project = workflow_project(&state);
990 let space_path = workflow_space_path(&state);
991
992 if query.q.is_none() {
994 if let Some(cached) = get_cached(query.include_deprecated) {
995 return Json(serde_json::json!({ "workflows": cached })).into_response();
996 }
997 }
998
999 let items_result = if let Some(ref q) = query.q {
1000 client
1001 .search_items(q, &project, "workflow", query.include_deprecated)
1002 .await
1003 .map(|results| results.into_iter().map(|sr| sr.item).collect::<Vec<_>>())
1004 } else {
1005 client
1006 .list_items(&space_path, query.include_deprecated)
1007 .await
1008 };
1009
1010 match items_result {
1011 Ok(items) => {
1012 let workflows = merge_with_builtins(enrich_items(&client, items).await);
1013 if query.q.is_none() {
1014 set_cached(&workflows, query.include_deprecated);
1015 }
1016 Json(serde_json::json!({ "workflows": workflows })).into_response()
1017 }
1018 Err(ref e) if matches!(e, KumihoError::Api { status: 404, .. }) => {
1019 let _ = client.ensure_project(&project).await;
1020 let _ = client.ensure_space(&project, WORKFLOW_SPACE_NAME).await;
1021 let workflows = merge_with_builtins(Vec::new());
1022 Json(serde_json::json!({ "workflows": workflows })).into_response()
1023 }
1024 Err(e) => {
1025 if query.q.is_none() {
1027 let lock = WORKFLOW_CACHE.get_or_init(|| Mutex::new(None));
1028 let cache = lock.lock();
1029 if let Some(ref c) = *cache {
1030 tracing::warn!("Workflows list failed, returning stale cache: {e}");
1031 return Json(serde_json::json!({ "workflows": c.workflows })).into_response();
1032 }
1033 }
1034 kumiho_err(e).into_response()
1035 }
1036 }
1037}
1038
1039pub async fn handle_create_workflow(
1041 State(state): State<AppState>,
1042 headers: HeaderMap,
1043 Json(body): Json<CreateWorkflowBody>,
1044) -> impl IntoResponse {
1045 if let Err(e) = require_auth(&state, &headers) {
1046 return e.into_response();
1047 }
1048
1049 let client = build_kumiho_client(&state);
1050 let project = workflow_project(&state);
1051 let space_path = workflow_space_path(&state);
1052
1053 if let Err(e) = client.ensure_project(&project).await {
1054 return kumiho_err(e).into_response();
1055 }
1056 if let Err(e) = client.ensure_space(&project, WORKFLOW_SPACE_NAME).await {
1057 return kumiho_err(e).into_response();
1058 }
1059
1060 let slug = slugify(&body.name);
1061 let item = match client
1062 .create_item(&space_path, &slug, "workflow", HashMap::new())
1063 .await
1064 {
1065 Ok(item) => item,
1066 Err(e) => return kumiho_err(e).into_response(),
1067 };
1068
1069 let metadata = workflow_metadata(&body);
1070 let rev = match client.create_revision(&item.kref, metadata).await {
1071 Ok(rev) => rev,
1072 Err(e) => return kumiho_err(e).into_response(),
1073 };
1074
1075 persist_workflow_artifact(&client, &rev.kref, rev.number, &body.name, &body.definition).await;
1077 let _ = client.tag_revision(&rev.kref, "published").await;
1078
1079 invalidate_cache();
1080 sync_cron_for_workflow(&state, &body.name, &body.definition);
1081
1082 let workflow = to_workflow_response(&item, Some(&rev));
1083 (
1084 StatusCode::CREATED,
1085 Json(serde_json::json!({ "workflow": workflow })),
1086 )
1087 .into_response()
1088}
1089
1090pub async fn handle_update_workflow(
1092 State(state): State<AppState>,
1093 headers: HeaderMap,
1094 Path(kref): Path<String>,
1095 Json(body): Json<CreateWorkflowBody>,
1096) -> impl IntoResponse {
1097 if let Err(e) = require_auth(&state, &headers) {
1098 return e.into_response();
1099 }
1100
1101 let kref = format!("kref://{kref}");
1102 let client = build_kumiho_client(&state);
1103
1104 let metadata = workflow_metadata(&body);
1105 let rev = match client.create_revision(&kref, metadata).await {
1106 Ok(rev) => rev,
1107 Err(e) => return kumiho_err(e).into_response(),
1108 };
1109
1110 persist_workflow_artifact(&client, &rev.kref, rev.number, &body.name, &body.definition).await;
1112 let _ = client.tag_revision(&rev.kref, "published").await;
1113
1114 let items = match client.list_items(&workflow_space_path(&state), true).await {
1115 Ok(items) => items,
1116 Err(e) => return kumiho_err(e).into_response(),
1117 };
1118
1119 invalidate_cache();
1120 sync_cron_for_workflow(&state, &body.name, &body.definition);
1121
1122 let item = items.iter().find(|i| i.kref == kref);
1123 match item {
1124 Some(item) => {
1125 let workflow = to_workflow_response(item, Some(&rev));
1126 Json(serde_json::json!({ "workflow": workflow })).into_response()
1127 }
1128 None => {
1129 let fallback = ItemResponse {
1130 kref: kref.clone(),
1131 name: body.name.clone(),
1132 item_name: body.name.clone(),
1133 kind: "workflow".to_string(),
1134 deprecated: false,
1135 created_at: None,
1136 metadata: HashMap::new(),
1137 };
1138 let workflow = to_workflow_response(&fallback, Some(&rev));
1139 Json(serde_json::json!({ "workflow": workflow })).into_response()
1140 }
1141 }
1142}
1143
1144pub async fn handle_deprecate_workflow(
1146 State(state): State<AppState>,
1147 headers: HeaderMap,
1148 Json(body): Json<DeprecateBody>,
1149) -> impl IntoResponse {
1150 if let Err(e) = require_auth(&state, &headers) {
1151 return e.into_response();
1152 }
1153
1154 let kref = body.kref.clone();
1155 let client = build_kumiho_client(&state);
1156
1157 match client.deprecate_item(&kref, body.deprecated).await {
1158 Ok(item) => {
1159 invalidate_cache();
1160 let rev = client.get_published_or_latest(&kref).await.ok();
1161
1162 if body.deprecated {
1164 if let Some(item_segment) = kref.split('/').last() {
1166 let workflow_name = item_segment
1167 .rsplit_once('.')
1168 .map(|(name, _kind)| name)
1169 .unwrap_or(item_segment);
1170 let config = state.config.lock();
1171 if let Err(e) = crate::cron::remove_workflow_cron_jobs(&config, workflow_name) {
1172 tracing::warn!("Failed to remove cron jobs for deprecated workflow: {e}");
1173 }
1174 }
1175 } else if let Some(ref rev) = rev {
1176 if let Some(definition) = rev.metadata.get("definition") {
1178 sync_cron_for_workflow(&state, &item.item_name, definition);
1179 }
1180 }
1181
1182 let workflow = to_workflow_response(&item, rev.as_ref());
1183 Json(serde_json::json!({ "workflow": workflow })).into_response()
1184 }
1185 Err(e) => kumiho_err(e).into_response(),
1186 }
1187}
1188
1189pub async fn handle_delete_workflow(
1191 State(state): State<AppState>,
1192 headers: HeaderMap,
1193 Path(kref): Path<String>,
1194) -> impl IntoResponse {
1195 if let Err(e) = require_auth(&state, &headers) {
1196 return e.into_response();
1197 }
1198
1199 let kref = format!("kref://{kref}");
1200 let client = build_kumiho_client(&state);
1201
1202 match client.delete_item(&kref).await {
1203 Ok(()) => {
1204 invalidate_cache();
1205
1206 if let Some(item_segment) = kref.split('/').last() {
1210 let workflow_name = item_segment
1211 .rsplit_once('.')
1212 .map(|(name, _kind)| name)
1213 .unwrap_or(item_segment);
1214 let config = state.config.lock();
1215 if let Err(e) = crate::cron::remove_workflow_cron_jobs(&config, workflow_name) {
1216 tracing::warn!("Failed to remove cron jobs for deleted workflow: {e}");
1217 }
1218 }
1219
1220 StatusCode::NO_CONTENT.into_response()
1221 }
1222 Err(e) => kumiho_err(e).into_response(),
1223 }
1224}
1225
1226pub async fn handle_run_workflow(
1231 State(state): State<AppState>,
1232 headers: HeaderMap,
1233 Path(name): Path<String>,
1234 body: Option<Json<RunWorkflowBody>>,
1235) -> impl IntoResponse {
1236 if let Err(e) = require_auth(&state, &headers) {
1237 return e.into_response();
1238 }
1239
1240 let inputs = body
1241 .as_ref()
1242 .map(|b| b.inputs.clone())
1243 .unwrap_or(serde_json::Value::Object(Default::default()));
1244 let cwd = body.as_ref().and_then(|b| b.cwd.clone());
1245
1246 let run_id = uuid::Uuid::new_v4().to_string();
1247 let now = chrono::Utc::now().to_rfc3339();
1248
1249 let client = build_kumiho_client(&state);
1250 let project = workflow_project(&state);
1251 let requests_space_path = workflow_run_requests_space_path(&state);
1252
1253 let _ = client.ensure_project(&project).await;
1255 let _ = client
1256 .ensure_space(&project, WORKFLOW_RUN_REQUESTS_SPACE_NAME)
1257 .await;
1258
1259 let mut metadata = HashMap::new();
1260 metadata.insert("workflow_name".to_string(), name.clone());
1261 metadata.insert("run_id".to_string(), run_id.clone());
1262 metadata.insert("inputs".to_string(), inputs.to_string());
1263 metadata.insert("cwd".to_string(), cwd.unwrap_or_default());
1264 metadata.insert("trigger_source".to_string(), "api".to_string());
1265 metadata.insert("requested_at".to_string(), now);
1266
1267 let item_name = format!("run-{}", &run_id[..run_id.len().min(12)]);
1268
1269 match client
1270 .create_item(
1271 &requests_space_path,
1272 &item_name,
1273 "workflow-run-request",
1274 metadata.clone(),
1275 )
1276 .await
1277 {
1278 Ok(item) => {
1279 if let Ok(rev) = client.create_revision(&item.kref, metadata).await {
1280 let _ = client.tag_revision(&rev.kref, "pending").await;
1281 }
1282 (
1283 StatusCode::OK,
1284 Json(serde_json::json!({
1285 "run_id": run_id,
1286 "workflow": name,
1287 "status": "pending",
1288 })),
1289 )
1290 .into_response()
1291 }
1292 Err(e) => {
1293 tracing::warn!("Failed to create workflow run request: {e}");
1294 (
1295 StatusCode::INTERNAL_SERVER_ERROR,
1296 Json(serde_json::json!({
1297 "error": format!("Failed to create run request: {e}")
1298 })),
1299 )
1300 .into_response()
1301 }
1302 }
1303}
1304
1305pub async fn handle_get_workflow_by_revision(
1312 State(state): State<AppState>,
1313 headers: HeaderMap,
1314 Path(kref): Path<String>,
1315) -> impl IntoResponse {
1316 if let Err(e) = require_auth(&state, &headers) {
1317 return e.into_response();
1318 }
1319
1320 let revision_kref = if kref.starts_with("kref://") {
1321 kref.clone()
1322 } else {
1323 format!("kref://{kref}")
1324 };
1325
1326 let client = build_kumiho_client(&state);
1327
1328 let mut rev = match client.get_revision(&revision_kref).await {
1329 Ok(r) => r,
1330 Err(e) => return kumiho_err(e).into_response(),
1331 };
1332
1333 if let Ok(artifact) = client
1339 .get_artifact_by_name(&rev.kref, "workflow.yaml")
1340 .await
1341 {
1342 let path = artifact
1343 .location
1344 .strip_prefix("file://")
1345 .unwrap_or(&artifact.location);
1346 if let Ok(yaml) = tokio::fs::read_to_string(path).await {
1347 rev.metadata.insert("definition".to_string(), yaml);
1348 }
1349 }
1350
1351 let item_name = rev
1354 .item_kref
1355 .rsplit('/')
1356 .next()
1357 .map(|seg| {
1358 seg.rsplit_once('.')
1359 .map(|(n, _)| n)
1360 .unwrap_or(seg)
1361 .to_string()
1362 })
1363 .unwrap_or_default();
1364
1365 let item = ItemResponse {
1366 kref: rev.item_kref.clone(),
1367 name: item_name.clone(),
1368 item_name,
1369 kind: "workflow".to_string(),
1370 deprecated: false,
1371 created_at: rev.created_at.clone(),
1372 metadata: HashMap::new(),
1373 };
1374
1375 let workflow = to_workflow_response(&item, Some(&rev));
1376 Json(serde_json::json!({ "workflow": workflow })).into_response()
1377}
1378
1379pub async fn handle_list_workflow_runs(
1383 State(state): State<AppState>,
1384 headers: HeaderMap,
1385 Query(query): Query<WorkflowRunsQuery>,
1386) -> impl IntoResponse {
1387 if let Err(e) = require_auth(&state, &headers) {
1388 return e.into_response();
1389 }
1390
1391 let client = build_kumiho_client(&state);
1392 let project = workflow_project(&state);
1393 let runs_space = workflow_runs_space_path(&state);
1394
1395 match client.list_items(&runs_space, false).await {
1396 Ok(mut items) => {
1397 items.retain(|i| i.kind == "workflow_run");
1399
1400 if let Some(ref wf_name) = query.workflow {
1401 items.retain(|item| {
1402 item.metadata
1403 .get("workflow_name")
1404 .or_else(|| item.metadata.get("workflow"))
1405 .map(|n| n == wf_name)
1406 .unwrap_or(false)
1407 });
1408 }
1409
1410 items.sort_by(|a, b| {
1411 let a_time = a.created_at.as_deref().unwrap_or("");
1412 let b_time = b.created_at.as_deref().unwrap_or("");
1413 b_time.cmp(a_time)
1414 });
1415 items.truncate(query.limit);
1416
1417 let krefs: Vec<String> = items.iter().map(|i| i.kref.clone()).collect();
1418 let rev_map = client
1419 .batch_get_revisions(&krefs, "latest")
1420 .await
1421 .unwrap_or_default();
1422
1423 let runs: Vec<WorkflowRunSummary> = items
1424 .iter()
1425 .map(|item| to_run_summary(item, rev_map.get(&item.kref)))
1426 .collect();
1427
1428 Json(serde_json::json!({ "runs": runs, "count": runs.len() })).into_response()
1429 }
1430 Err(ref e) if matches!(e, KumihoError::Api { status: 404, .. }) => {
1431 let _ = client.ensure_project(&project).await;
1432 let _ = client.ensure_space(&project, WORKFLOW_RUNS_SPACE_NAME).await;
1433 Json(serde_json::json!({ "runs": [], "count": 0 })).into_response()
1434 }
1435 Err(e) => {
1436 let msg = format!("Failed to fetch workflow runs: {e}");
1437 (
1438 StatusCode::SERVICE_UNAVAILABLE,
1439 Json(serde_json::json!({ "error": msg })),
1440 )
1441 .into_response()
1442 }
1443 }
1444}
1445
1446pub async fn handle_get_workflow_run(
1448 State(state): State<AppState>,
1449 headers: HeaderMap,
1450 Path(run_id): Path<String>,
1451) -> impl IntoResponse {
1452 if let Err(e) = require_auth(&state, &headers) {
1453 return e.into_response();
1454 }
1455
1456 let client = build_kumiho_client(&state);
1457 let project = workflow_project(&state);
1458 let runs_space = workflow_runs_space_path(&state);
1459
1460 let run_id_prefix = &run_id[..run_id.len().min(12)];
1463
1464 if let Ok(items) = client
1468 .list_items_filtered(&runs_space, run_id_prefix, false)
1469 .await
1470 {
1471 let run_id_lower = run_id.to_lowercase();
1473 let prefix_lower = run_id_lower[..run_id_lower.len().min(12)].to_string();
1474 if let Some(item) = items.iter().find(|i| {
1475 i.kind == "workflow_run" && i.item_name.to_lowercase().contains(&prefix_lower)
1476 }) {
1477 let rev = client.get_latest_revision(&item.kref).await.ok();
1478 let detail = to_run_detail(item, rev.as_ref());
1479 return Json(serde_json::json!({ "run": detail })).into_response();
1480 }
1481 }
1482
1483 if let Ok(results) = client
1486 .search_items(&run_id, &project, "workflow_run", false)
1487 .await
1488 {
1489 if let Some(sr) = results.first() {
1490 let rev = client.get_latest_revision(&sr.item.kref).await.ok();
1491 let detail = to_run_detail(&sr.item, rev.as_ref());
1492 return Json(serde_json::json!({ "run": detail })).into_response();
1493 }
1494 }
1495
1496 match client.list_items(&runs_space, false).await {
1498 Ok(items) => {
1499 let run_id_lower = run_id.to_lowercase();
1500 let found = items.iter().find(|item| {
1501 if item.kind != "workflow_run" {
1502 return false;
1503 }
1504 if let Some(meta_run_id) = item.metadata.get("run_id") {
1506 if meta_run_id == &run_id {
1507 return true;
1508 }
1509 }
1510 let prefix = &run_id_lower[..run_id_lower.len().min(12)];
1512 item.item_name.to_lowercase().contains(prefix)
1513 });
1514
1515 match found {
1516 Some(item) => {
1517 let rev = client.get_latest_revision(&item.kref).await.ok();
1518 let detail = to_run_detail(item, rev.as_ref());
1519 Json(serde_json::json!({ "run": detail })).into_response()
1520 }
1521 None => (
1522 StatusCode::NOT_FOUND,
1523 Json(serde_json::json!({ "error": format!("Run '{run_id}' not found") })),
1524 )
1525 .into_response(),
1526 }
1527 }
1528 Err(e) => {
1529 let msg = format!("Kumiho error looking up run '{run_id}': {e}");
1530 tracing::warn!("{msg}");
1531 (
1532 StatusCode::SERVICE_UNAVAILABLE,
1533 Json(serde_json::json!({ "error": msg })),
1534 )
1535 .into_response()
1536 }
1537 }
1538}
1539
1540pub async fn handle_delete_workflow_run(
1546 State(state): State<AppState>,
1547 headers: HeaderMap,
1548 Path(run_id): Path<String>,
1549) -> impl IntoResponse {
1550 if let Err(e) = require_auth(&state, &headers) {
1551 return e.into_response();
1552 }
1553
1554 let client = build_kumiho_client(&state);
1555 let runs_space = workflow_runs_space_path(&state);
1556
1557 let run_id_prefix = &run_id[..run_id.len().min(12)];
1559
1560 let kref = if let Ok(items) = client
1561 .list_items_filtered(&runs_space, run_id_prefix, false)
1562 .await
1563 {
1564 let run_id_lower = run_id.to_lowercase();
1565 let prefix_lower = run_id_lower[..run_id_lower.len().min(12)].to_string();
1566 items
1567 .iter()
1568 .find(|i| {
1569 i.kind == "workflow_run" && i.item_name.to_lowercase().contains(&prefix_lower)
1570 })
1571 .map(|i| i.kref.clone())
1572 } else {
1573 None
1574 };
1575
1576 let kref = match kref {
1577 Some(k) => k,
1578 None => {
1579 return (
1580 StatusCode::NOT_FOUND,
1581 Json(serde_json::json!({ "error": format!("Run '{run_id}' not found") })),
1582 )
1583 .into_response();
1584 }
1585 };
1586
1587 match client.delete_item(&kref).await {
1588 Ok(()) => StatusCode::NO_CONTENT.into_response(),
1589 Err(e) => {
1590 let msg = format!("Failed to delete run '{run_id}': {e}");
1591 tracing::warn!("{msg}");
1592 kumiho_err(e).into_response()
1593 }
1594 }
1595}
1596
1597pub async fn handle_approve_workflow_run(
1604 State(state): State<AppState>,
1605 headers: HeaderMap,
1606 Path(run_id): Path<String>,
1607 Json(body): Json<ApproveWorkflowBody>,
1608) -> impl IntoResponse {
1609 if let Err(e) = require_auth(&state, &headers) {
1610 return e.into_response();
1611 }
1612
1613 let approved = body.approved;
1614 let feedback = body.feedback.unwrap_or_default();
1615
1616 let claimed = state.approval_registry.try_claim(&run_id);
1620 let cwd = claimed
1621 .as_ref()
1622 .map(|a| a.cwd.clone())
1623 .unwrap_or_else(|| "/tmp".to_string());
1624
1625 if claimed.is_none() {
1626 tracing::info!(
1627 "approve_workflow_run: no registry entry for run_id={run_id} (gateway restart?), \
1628 calling resume_workflow directly"
1629 );
1630 }
1631
1632 let tool_name = format!(
1634 "{}__resume_workflow",
1635 crate::agent::operator::OPERATOR_SERVER_NAME
1636 );
1637 let mut tool_args = serde_json::Map::new();
1638 tool_args.insert(
1639 "run_id".to_string(),
1640 serde_json::Value::String(run_id.clone()),
1641 );
1642 tool_args.insert("approved".to_string(), serde_json::Value::Bool(approved));
1643 tool_args.insert(
1644 "response".to_string(),
1645 serde_json::Value::String(feedback.clone()),
1646 );
1647 tool_args.insert("cwd".to_string(), serde_json::Value::String(cwd));
1648
1649 let mcp_result = if let Some(ref registry) = state.mcp_registry {
1650 let mcp_future = registry.call_tool(&tool_name, serde_json::Value::Object(tool_args));
1651 match tokio::time::timeout(std::time::Duration::from_secs(30), mcp_future).await {
1652 Ok(Ok(result_str)) => Ok(result_str),
1653 Ok(Err(e)) => Err(format!("operator tool call failed: {e:#}")),
1654 Err(_) => Err("operator tool call timed out (30s)".to_string()),
1655 }
1656 } else {
1657 Err("MCP registry not available — operator not connected".to_string())
1658 };
1659
1660 match mcp_result {
1661 Ok(_) => {
1662 let _ = state.event_tx.send(serde_json::json!({
1665 "type": "human_approval_resolved",
1666 "run_id": run_id,
1667 "approved": approved,
1668 "timestamp": chrono::Utc::now().to_rfc3339(),
1669 }));
1670
1671 (
1672 StatusCode::OK,
1673 Json(serde_json::json!({
1674 "status": "ok",
1675 "message": if approved { "Workflow approved" } else { "Workflow rejected" },
1676 "run_id": run_id,
1677 "approved": approved,
1678 })),
1679 )
1680 .into_response()
1681 }
1682 Err(e) => {
1683 tracing::warn!("approve_workflow_run: failed for run_id={run_id}: {e}");
1684 (
1685 StatusCode::BAD_GATEWAY,
1686 Json(serde_json::json!({
1687 "error": format!("Failed to resume workflow: {e}")
1688 })),
1689 )
1690 .into_response()
1691 }
1692 }
1693}
1694
1695#[derive(Deserialize)]
1696pub struct ApproveWorkflowBody {
1697 pub approved: bool,
1698 pub feedback: Option<String>,
1699}
1700
1701pub async fn handle_retry_workflow_run(
1708 State(state): State<AppState>,
1709 headers: HeaderMap,
1710 Path(run_id): Path<String>,
1711 body: Option<Json<RetryWorkflowBody>>,
1712) -> impl IntoResponse {
1713 if let Err(e) = require_auth(&state, &headers) {
1714 return e.into_response();
1715 }
1716
1717 let cwd = body
1718 .and_then(|Json(b)| b.cwd)
1719 .unwrap_or_else(|| "/tmp".to_string());
1720
1721 let tool_name = format!(
1722 "{}__retry_workflow",
1723 crate::agent::operator::OPERATOR_SERVER_NAME
1724 );
1725 let mut tool_args = serde_json::Map::new();
1726 tool_args.insert(
1727 "run_id".to_string(),
1728 serde_json::Value::String(run_id.clone()),
1729 );
1730 tool_args.insert("cwd".to_string(), serde_json::Value::String(cwd));
1731
1732 let mcp_result = if let Some(ref registry) = state.mcp_registry {
1733 let mcp_future = registry.call_tool(&tool_name, serde_json::Value::Object(tool_args));
1734 match tokio::time::timeout(std::time::Duration::from_secs(30), mcp_future).await {
1735 Ok(Ok(result_str)) => Ok(result_str),
1736 Ok(Err(e)) => Err(format!("operator tool call failed: {e:#}")),
1737 Err(_) => Err("operator tool call timed out (30s)".to_string()),
1738 }
1739 } else {
1740 Err("MCP registry not available — operator not connected".to_string())
1741 };
1742
1743 match mcp_result {
1744 Ok(result_str) => {
1745 let _ = state.event_tx.send(serde_json::json!({
1746 "type": "workflow_retry",
1747 "run_id": run_id,
1748 "timestamp": chrono::Utc::now().to_rfc3339(),
1749 }));
1750 let payload = serde_json::from_str::<serde_json::Value>(&result_str)
1751 .unwrap_or_else(|_| serde_json::json!({"raw": result_str}));
1752 (StatusCode::OK, Json(payload)).into_response()
1753 }
1754 Err(e) => {
1755 tracing::warn!("retry_workflow_run: failed for run_id={run_id}: {e}");
1756 (
1757 StatusCode::BAD_GATEWAY,
1758 Json(serde_json::json!({ "error": format!("Failed to retry workflow: {e}") })),
1759 )
1760 .into_response()
1761 }
1762 }
1763}
1764
1765#[derive(Deserialize)]
1766pub struct RetryWorkflowBody {
1767 pub cwd: Option<String>,
1768}
1769
1770pub async fn handle_agent_activity(
1776 State(state): State<AppState>,
1777 headers: HeaderMap,
1778 Path(agent_id): Path<String>,
1779 Query(query): Query<AgentActivityQuery>,
1780) -> impl IntoResponse {
1781 if let Err(e) = require_auth(&state, &headers) {
1782 return e.into_response();
1783 }
1784
1785 let runlogs_dir =
1786 std::path::PathBuf::from(std::env::var("HOME").unwrap_or_else(|_| "/tmp".into()))
1787 .join(".construct/operator_mcp/runlogs");
1788 let path = runlogs_dir.join(format!("{agent_id}.jsonl"));
1789
1790 if !path.exists() {
1791 return (
1792 StatusCode::NOT_FOUND,
1793 Json(serde_json::json!({ "error": "No run log found for this agent" })),
1794 )
1795 .into_response();
1796 }
1797
1798 let content = match std::fs::read_to_string(&path) {
1799 Ok(c) => c,
1800 Err(e) => {
1801 return (
1802 StatusCode::INTERNAL_SERVER_ERROR,
1803 Json(serde_json::json!({ "error": format!("Failed to read log: {e}") })),
1804 )
1805 .into_response();
1806 }
1807 };
1808
1809 let view = query.view.as_deref().unwrap_or("summary");
1810 let limit = query.limit.unwrap_or(100).min(500) as usize;
1811
1812 let entries: Vec<serde_json::Value> = content
1813 .lines()
1814 .filter_map(|line| serde_json::from_str(line).ok())
1815 .collect();
1816
1817 match view {
1818 "tool_calls" => {
1819 let tools: Vec<&serde_json::Value> = entries
1821 .iter()
1822 .filter(|e| e.get("kind").and_then(|v| v.as_str()) == Some("tool_call"))
1823 .collect();
1824 let total = tools.len();
1825 let slice: Vec<_> = tools.into_iter().rev().take(limit).collect();
1826 Json(serde_json::json!({
1827 "agent_id": agent_id,
1828 "view": "tool_calls",
1829 "total": total,
1830 "entries": slice,
1831 }))
1832 .into_response()
1833 }
1834 "messages" => {
1835 let msgs: Vec<&serde_json::Value> = entries
1837 .iter()
1838 .filter(|e| {
1839 let kind = e.get("kind").and_then(|v| v.as_str()).unwrap_or("");
1840 kind == "message" || kind == "user_message"
1841 })
1842 .collect();
1843 let total = msgs.len();
1844 let slice: Vec<_> = msgs.into_iter().rev().take(limit).collect();
1845 Json(serde_json::json!({
1846 "agent_id": agent_id,
1847 "view": "messages",
1848 "total": total,
1849 "entries": slice,
1850 }))
1851 .into_response()
1852 }
1853 "errors" => {
1854 let errs: Vec<&serde_json::Value> = entries
1855 .iter()
1856 .filter(|e| {
1857 let kind = e.get("kind").and_then(|v| v.as_str()).unwrap_or("");
1858 kind == "error"
1859 || kind == "turn_failed"
1860 || e.get("status").and_then(|v| v.as_str()) == Some("failed")
1861 })
1862 .collect();
1863 Json(serde_json::json!({
1864 "agent_id": agent_id,
1865 "view": "errors",
1866 "total": errs.len(),
1867 "entries": errs,
1868 }))
1869 .into_response()
1870 }
1871 "full" => {
1872 let total = entries.len();
1874 let slice: Vec<_> = entries.into_iter().rev().take(limit).collect();
1875 Json(serde_json::json!({
1876 "agent_id": agent_id,
1877 "view": "full",
1878 "total": total,
1879 "entries": slice,
1880 }))
1881 .into_response()
1882 }
1883 _ => {
1884 let header = entries.first().cloned().unwrap_or_default();
1886 let tool_count = entries
1887 .iter()
1888 .filter(|e| e.get("kind").and_then(|v| v.as_str()) == Some("tool_call"))
1889 .count();
1890 let error_count = entries
1891 .iter()
1892 .filter(|e| {
1893 let kind = e.get("kind").and_then(|v| v.as_str()).unwrap_or("");
1894 kind == "error" || kind == "turn_failed"
1895 })
1896 .count();
1897 let last_message = entries
1898 .iter()
1899 .rev()
1900 .find(|e| e.get("kind").and_then(|v| v.as_str()) == Some("message"))
1901 .and_then(|e| e.get("text").and_then(|v| v.as_str()))
1902 .unwrap_or("");
1903 let last_msg_truncated = if last_message.len() > 5000 {
1905 &last_message[..5000]
1906 } else {
1907 last_message
1908 };
1909 let recent_tools: Vec<_> = entries
1911 .iter()
1912 .filter(|e| e.get("kind").and_then(|v| v.as_str()) == Some("tool_call"))
1913 .rev()
1914 .take(20)
1915 .cloned()
1916 .collect();
1917 let mut input_tokens: u64 = 0;
1919 let mut output_tokens: u64 = 0;
1920 let mut total_cost: f64 = 0.0;
1921 for e in &entries {
1922 if e.get("kind").and_then(|v| v.as_str()) == Some("turn_completed") {
1923 if let Some(usage) = e.get("usage") {
1924 input_tokens += usage
1925 .get("inputTokens")
1926 .and_then(|v| v.as_u64())
1927 .unwrap_or(0);
1928 output_tokens += usage
1929 .get("outputTokens")
1930 .and_then(|v| v.as_u64())
1931 .unwrap_or(0);
1932 total_cost += usage
1933 .get("totalCostUsd")
1934 .and_then(|v| v.as_f64())
1935 .unwrap_or(0.0);
1936 }
1937 }
1938 }
1939 Json(serde_json::json!({
1940 "agent_id": agent_id,
1941 "view": "summary",
1942 "title": header.get("title").and_then(|v| v.as_str()).unwrap_or(""),
1943 "agent_type": header.get("agent_type").and_then(|v| v.as_str()).unwrap_or(""),
1944 "total_events": entries.len(),
1945 "tool_call_count": tool_count,
1946 "error_count": error_count,
1947 "last_message": last_msg_truncated,
1948 "recent_tools": recent_tools,
1949 "usage": {
1950 "input_tokens": input_tokens,
1951 "output_tokens": output_tokens,
1952 "total_cost_usd": total_cost,
1953 },
1954 }))
1955 .into_response()
1956 }
1957 }
1958}
1959
1960#[derive(Deserialize)]
1961pub struct AgentActivityQuery {
1962 view: Option<String>,
1963 limit: Option<u32>,
1964}
1965
1966pub async fn handle_workflow_dashboard(
1968 State(state): State<AppState>,
1969 headers: HeaderMap,
1970) -> impl IntoResponse {
1971 if let Err(e) = require_auth(&state, &headers) {
1972 return e.into_response();
1973 }
1974
1975 let client = build_kumiho_client(&state);
1976 let space_path = workflow_space_path(&state);
1977 let runs_space = workflow_runs_space_path(&state);
1978
1979 let definitions = match client.list_items(&space_path, false).await {
1981 Ok(items) => merge_with_builtins(enrich_items(&client, items).await),
1982 Err(_) => merge_with_builtins(Vec::new()),
1983 };
1984 let definitions_count = definitions.len();
1985
1986 let (recent_runs, total_runs) = match client.list_items(&runs_space, false).await {
1988 Ok(mut items) => {
1989 let total = items.len();
1990 items.sort_by(|a, b| {
1991 let a_time = a.created_at.as_deref().unwrap_or("");
1992 let b_time = b.created_at.as_deref().unwrap_or("");
1993 b_time.cmp(a_time)
1994 });
1995 items.truncate(5);
1996
1997 let krefs: Vec<String> = items.iter().map(|i| i.kref.clone()).collect();
1998 let rev_map = client
1999 .batch_get_revisions(&krefs, "latest")
2000 .await
2001 .unwrap_or_default();
2002
2003 let runs: Vec<WorkflowRunSummary> = items
2004 .iter()
2005 .map(|item| to_run_summary(item, rev_map.get(&item.kref)))
2006 .collect();
2007
2008 (runs, total)
2009 }
2010 Err(_) => (Vec::new(), 0),
2011 };
2012
2013 let active_runs = recent_runs
2014 .iter()
2015 .filter(|r| r.status == "running" || r.status == "paused")
2016 .count();
2017
2018 let dashboard = WorkflowDashboard {
2019 definitions_count,
2020 definitions,
2021 active_runs,
2022 recent_runs,
2023 total_runs,
2024 };
2025
2026 Json(serde_json::json!({ "dashboard": dashboard })).into_response()
2027}