1use super::BuiltinTool;
33use crate::ast::parse_analyzed;
34use crate::error::NikaError;
35use crate::runtime::Runner;
36use serde::{Deserialize, Serialize};
37use serde_json::Value;
38use std::cell::Cell;
39use std::future::Future;
40use std::path::Path;
41use std::pin::Pin;
42use std::time::{Duration, Instant};
43
44tokio::task_local! {
45 static WORKFLOW_DEPTH: Cell<u32>;
49}
50
51fn current_depth() -> u32 {
53 WORKFLOW_DEPTH.try_with(|d| d.get()).unwrap_or(0)
54}
55
56const MAX_ALLOWED_DEPTH: u32 = 10;
58
59const MAX_TIMEOUT_SECS: u64 = 3600;
61
62#[derive(Debug, Clone, Deserialize)]
65pub struct RunParams {
66 pub workflow: String,
68 #[serde(default)]
70 pub context_json: Option<String>,
71 #[serde(default)]
73 pub context: Option<Value>,
74 #[serde(default = "default_timeout")]
76 pub timeout_secs: u64,
77 #[serde(default = "default_max_depth")]
79 pub max_depth: u32,
80}
81
82fn default_timeout() -> u64 {
83 300
84}
85
86fn default_max_depth() -> u32 {
87 3
88}
89
90impl RunParams {
91 pub fn get_context(&self) -> Result<Option<Value>, NikaError> {
93 if let Some(ref json_str) = self.context_json {
94 let value =
95 serde_json::from_str(json_str).map_err(|e| NikaError::BuiltinInvalidParams {
96 tool: "nika:run".into(),
97 reason: format!("Invalid context_json: {}", e),
98 })?;
99 Ok(Some(value))
100 } else {
101 Ok(self.context.clone())
102 }
103 }
104}
105
106#[derive(Debug, Clone, Serialize)]
108pub struct RunResponse {
109 pub executed: bool,
111 pub workflow: String,
113 pub output: Value,
115 pub duration_ms: u64,
117 pub depth: u32,
119}
120
121#[derive(Default)]
131pub struct RunTool;
132
133impl BuiltinTool for RunTool {
134 fn name(&self) -> &'static str {
135 "run"
136 }
137
138 fn description(&self) -> &'static str {
139 "Execute nested workflow and return its output"
140 }
141
142 fn parameters_schema(&self) -> serde_json::Value {
143 serde_json::json!({
145 "type": "object",
146 "properties": {
147 "workflow": {
148 "type": "string",
149 "description": "Path to the workflow file to execute"
150 },
151 "context_json": {
152 "type": "string",
153 "description": "Context as JSON string (for OpenAI: '{\"key\": \"value\"}')"
154 },
155 "timeout_secs": {
156 "type": "integer",
157 "description": "Execution timeout in seconds (default: 300, max: 3600)"
158 },
159 "max_depth": {
160 "type": "integer",
161 "description": "Maximum recursion depth (default: 3, max: 10)"
162 }
163 },
164 "required": ["workflow"],
165 "additionalProperties": false
166 })
167 }
168
169 fn call<'a>(
170 &'a self,
171 args: String,
172 ) -> Pin<Box<dyn Future<Output = Result<String, NikaError>> + Send + 'a>> {
173 Box::pin(async move {
174 let start = Instant::now();
175
176 let params: RunParams =
178 serde_json::from_str(&args).map_err(|e| NikaError::BuiltinInvalidParams {
179 tool: "nika_run".into(),
180 reason: format!("Invalid JSON parameters: {}", e),
181 })?;
182
183 if params.workflow.is_empty() {
185 return Err(NikaError::BuiltinInvalidParams {
186 tool: "nika_run".into(),
187 reason: "Workflow path cannot be empty".into(),
188 });
189 }
190
191 if !params.workflow.ends_with(".nika.yaml") && !params.workflow.ends_with(".nika.yml") {
193 return Err(NikaError::BuiltinInvalidParams {
194 tool: "nika_run".into(),
195 reason: format!(
196 "Workflow path must have .nika.yaml or .nika.yml extension: '{}'",
197 params.workflow
198 ),
199 });
200 }
201
202 let max_depth = params.max_depth.clamp(1, MAX_ALLOWED_DEPTH);
205 let timeout_secs = params.timeout_secs.clamp(1, MAX_TIMEOUT_SECS);
206
207 let depth = current_depth();
209 if depth >= max_depth {
210 return Err(NikaError::BuiltinToolError {
211 tool: "nika_run".into(),
212 reason: format!(
213 "Maximum recursion depth ({}) reached at depth {}. Cannot execute nested workflow '{}'.",
214 max_depth, depth, params.workflow
215 ),
216 });
217 }
218
219 let next_depth = depth + 1;
220 let timeout_duration = Duration::from_secs(timeout_secs);
221
222 let workflow_path = Path::new(¶ms.workflow);
224 let canonical_path =
225 workflow_path
226 .canonicalize()
227 .map_err(|e| NikaError::BuiltinToolError {
228 tool: "nika_run".into(),
229 reason: format!(
230 "Failed to resolve workflow path '{}': {}",
231 params.workflow, e
232 ),
233 })?;
234
235 tracing::debug!(
238 target: "nika_run",
239 original = %params.workflow,
240 canonical = %canonical_path.display(),
241 "Resolved workflow path"
242 );
243
244 let yaml_content = tokio::time::timeout(
247 Duration::from_secs(30), tokio::fs::read_to_string(&canonical_path),
249 )
250 .await
251 .map_err(|_| NikaError::BuiltinToolError {
252 tool: "nika_run".into(),
253 reason: format!(
254 "Timed out reading workflow file '{}' after 30 seconds",
255 params.workflow
256 ),
257 })?
258 .map_err(|e| NikaError::BuiltinToolError {
259 tool: "nika_run".into(),
260 reason: format!("Failed to read workflow file: {}", e),
261 })?;
262
263 let workflow =
264 parse_analyzed(&yaml_content).map_err(|e| NikaError::BuiltinToolError {
265 tool: "nika_run".into(),
266 reason: format!("Failed to parse workflow YAML: {}", e),
267 })?;
268
269 tracing::info!(
270 target: "nika_run",
271 workflow = %params.workflow,
272 depth = next_depth,
273 max_depth = max_depth,
274 timeout_secs = timeout_secs,
275 has_context = params.context.is_some() || params.context_json.is_some(),
276 "Executing nested workflow"
277 );
278
279 let mut runner = Runner::new(workflow)?.quiet();
281
282 if let Some(context) = params.get_context()? {
284 runner = runner.with_initial_context("__parent_context__", context);
285 }
286
287 let execution_result = WORKFLOW_DEPTH
290 .scope(Cell::new(next_depth), async {
291 tokio::time::timeout(timeout_duration, runner.run()).await
292 })
293 .await;
294
295 let duration_ms = start.elapsed().as_millis() as u64;
296
297 let result = match execution_result {
299 Ok(Ok(output)) => output,
300 Ok(Err(e)) => {
301 return Err(NikaError::BuiltinToolError {
302 tool: "nika_run".into(),
303 reason: format!("Workflow execution failed: {}", e),
304 });
305 }
306 Err(_) => {
307 return Err(NikaError::BuiltinToolError {
308 tool: "nika_run".into(),
309 reason: format!(
310 "Workflow execution timed out after {} seconds",
311 timeout_secs
312 ),
313 });
314 }
315 };
316
317 let response = RunResponse {
319 executed: true,
320 workflow: params.workflow,
321 output: serde_json::json!({
322 "status": "completed",
323 "result": result
324 }),
325 duration_ms,
326 depth: next_depth,
327 };
328
329 serde_json::to_string(&response).map_err(|e| NikaError::BuiltinToolError {
330 tool: "nika_run".into(),
331 reason: format!("Failed to serialize response: {}", e),
332 })
333 })
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340
341 #[test]
342 fn test_run_tool_name() {
343 let tool = RunTool;
344 assert_eq!(tool.name(), "run");
345 }
346
347 #[test]
348 fn test_run_tool_description() {
349 let tool = RunTool;
350 assert!(tool.description().contains("workflow"));
351 }
352
353 #[test]
354 fn test_run_tool_schema() {
355 let tool = RunTool;
356 let schema = tool.parameters_schema();
357 assert_eq!(schema["type"], "object");
358 assert!(schema["properties"]["workflow"].is_object());
359 assert!(schema["properties"]["context_json"].is_object());
361 assert!(schema["properties"]["timeout_secs"].is_object());
363 assert!(schema["properties"]["max_depth"].is_object());
364 assert_eq!(schema["additionalProperties"], false);
365 assert!(schema["required"]
366 .as_array()
367 .unwrap()
368 .contains(&serde_json::json!("workflow")));
369 }
370
371 #[tokio::test]
372 async fn test_run_nonexistent_file_errors() {
373 let tool = RunTool;
374 let result = tool
375 .call(r#"{"workflow": "path/to/workflow.nika.yaml"}"#.to_string())
376 .await;
377
378 assert!(result.is_err());
379 let err = result.unwrap_err();
380 assert!(
382 err.to_string().contains("resolve workflow path")
383 || err.to_string().contains("not found")
384 );
385 }
386
387 #[tokio::test]
388 async fn test_run_executes_real_workflow() {
389 use std::io::Write;
390 use tempfile::NamedTempFile;
391
392 let mut temp_file = NamedTempFile::with_suffix(".nika.yaml").unwrap();
394 writeln!(
395 temp_file,
396 r#"schema: nika/workflow@0.12
397workflow: test-workflow
398tasks:
399 - id: hello
400 exec: "echo hello""#
401 )
402 .unwrap();
403
404 let tool = RunTool;
405 let result = tool
406 .call(format!(
407 r#"{{"workflow": "{}"}}"#,
408 temp_file.path().display()
409 ))
410 .await;
411
412 assert!(result.is_ok());
413 let response: serde_json::Value = serde_json::from_str(&result.unwrap()).unwrap();
414 assert_eq!(response["executed"], true);
415 }
416
417 #[tokio::test]
418 async fn test_run_returns_workflow_output() {
419 use std::io::Write;
420 use tempfile::NamedTempFile;
421
422 let mut temp_file = NamedTempFile::with_suffix(".nika.yaml").unwrap();
424 writeln!(
425 temp_file,
426 r#"schema: nika/workflow@0.12
427workflow: test-output
428tasks:
429 - id: greet
430 exec: "echo world""#
431 )
432 .unwrap();
433
434 let tool = RunTool;
435 let result = tool
436 .call(format!(
437 r#"{{"workflow": "{}"}}"#,
438 temp_file.path().display()
439 ))
440 .await;
441
442 assert!(result.is_ok());
443 let response: serde_json::Value = serde_json::from_str(&result.unwrap()).unwrap();
444 assert_eq!(response["executed"], true);
445 assert!(response["output"].is_object());
446 }
447
448 #[tokio::test]
449 async fn test_run_empty_path_errors() {
450 let tool = RunTool;
451 let result = tool.call(r#"{"workflow": ""}"#.to_string()).await;
452
453 assert!(result.is_err());
454 let err = result.unwrap_err();
455 assert!(err.to_string().contains("cannot be empty"));
456 }
457
458 #[tokio::test]
459 async fn test_run_invalid_extension_errors() {
460 let tool = RunTool;
461 let result = tool
462 .call(r#"{"workflow": "workflow.yaml"}"#.to_string())
463 .await;
464
465 assert!(result.is_err());
466 let err = result.unwrap_err();
467 assert!(err.to_string().contains(".nika.yaml"));
468 }
469
470 #[tokio::test]
471 async fn test_run_accepts_yml_extension() {
472 use std::io::Write;
473 use tempfile::NamedTempFile;
474
475 let mut temp_file = NamedTempFile::with_suffix(".nika.yml").unwrap();
477 writeln!(
478 temp_file,
479 r#"schema: nika/workflow@0.12
480workflow: test-yml
481tasks:
482 - id: test
483 exec: "echo yml""#
484 )
485 .unwrap();
486
487 let tool = RunTool;
488 let result = tool
489 .call(format!(
490 r#"{{"workflow": "{}"}}"#,
491 temp_file.path().display()
492 ))
493 .await;
494
495 assert!(result.is_ok());
496 }
497
498 #[tokio::test]
499 async fn test_run_invalid_json() {
500 let tool = RunTool;
501 let result = tool.call("not json".to_string()).await;
502
503 assert!(result.is_err());
504 let err = result.unwrap_err();
505 assert!(err.to_string().contains("Invalid JSON parameters"));
506 }
507
508 #[tokio::test]
509 async fn test_run_missing_workflow() {
510 let tool = RunTool;
511 let result = tool.call(r#"{"context": {"test": 1}}"#.to_string()).await;
512
513 assert!(result.is_err());
514 let err = result.unwrap_err();
515 assert!(err.to_string().contains("Invalid JSON parameters"));
516 }
517
518 #[tokio::test]
519 async fn test_run_params_deserialization() {
520 let json = r#"{"workflow": "test.nika.yaml", "context": {"key": "value"}}"#;
521 let params: RunParams = serde_json::from_str(json).unwrap();
522
523 assert_eq!(params.workflow, "test.nika.yaml");
524 assert!(params.context.is_some());
525 assert_eq!(params.context.as_ref().unwrap()["key"], "value");
526 assert_eq!(params.timeout_secs, 300);
528 assert_eq!(params.max_depth, 3);
529 }
530
531 #[tokio::test]
532 async fn test_run_params_without_context() {
533 let json = r#"{"workflow": "test.nika.yaml"}"#;
534 let params: RunParams = serde_json::from_str(json).unwrap();
535
536 assert_eq!(params.workflow, "test.nika.yaml");
537 assert!(params.context.is_none());
538 assert_eq!(params.timeout_secs, 300);
540 assert_eq!(params.max_depth, 3);
541 }
542
543 #[tokio::test]
544 async fn test_run_params_custom_timeout_and_depth() {
545 let json = r#"{"workflow": "test.nika.yaml", "timeout_secs": 60, "max_depth": 5}"#;
546 let params: RunParams = serde_json::from_str(json).unwrap();
547
548 assert_eq!(params.workflow, "test.nika.yaml");
549 assert_eq!(params.timeout_secs, 60);
550 assert_eq!(params.max_depth, 5);
551 }
552
553 #[tokio::test]
554 async fn test_run_response_includes_duration_and_depth() {
555 use std::io::Write;
556 use tempfile::NamedTempFile;
557
558 let mut temp_file = NamedTempFile::with_suffix(".nika.yaml").unwrap();
560 writeln!(
561 temp_file,
562 r#"schema: nika/workflow@0.12
563workflow: test-response
564tasks:
565 - id: test
566 exec: "echo test""#
567 )
568 .unwrap();
569
570 let tool = RunTool;
571 let result = tool
572 .call(format!(
573 r#"{{"workflow": "{}"}}"#,
574 temp_file.path().display()
575 ))
576 .await;
577
578 assert!(result.is_ok());
579 let response: serde_json::Value = serde_json::from_str(&result.unwrap()).unwrap();
580 assert_eq!(response["executed"], true);
581 assert!(response["duration_ms"].is_number());
582 assert_eq!(response["depth"], 1);
583 }
584
585 #[test]
586 fn test_max_depth_constant() {
587 assert_eq!(MAX_ALLOWED_DEPTH, 10);
588 }
589
590 #[test]
591 fn test_max_timeout_constant() {
592 assert_eq!(MAX_TIMEOUT_SECS, 3600);
593 }
594
595 #[test]
596 fn test_default_timeout() {
597 assert_eq!(default_timeout(), 300);
598 }
599
600 #[test]
601 fn test_default_max_depth() {
602 assert_eq!(default_max_depth(), 3);
603 }
604
605 #[tokio::test]
610 async fn test_current_depth_returns_zero_outside_scope() {
611 let depth = current_depth();
613 assert_eq!(depth, 0, "Outside scope should return 0");
614 }
615
616 #[tokio::test]
617 async fn test_current_depth_returns_value_inside_scope() {
618 let depth = WORKFLOW_DEPTH
620 .scope(Cell::new(5), async { current_depth() })
621 .await;
622 assert_eq!(depth, 5, "Inside scope should return set value");
623 }
624
625 #[tokio::test]
626 async fn test_depth_isolation_between_concurrent_tasks() {
627 use std::sync::Arc;
628 use tokio::sync::Barrier;
629
630 let barrier = Arc::new(Barrier::new(2));
632
633 let b1 = Arc::clone(&barrier);
634 let task1 = tokio::spawn(async move {
635 WORKFLOW_DEPTH
636 .scope(Cell::new(1), async {
637 b1.wait().await;
639 tokio::time::sleep(Duration::from_millis(10)).await;
640 current_depth()
641 })
642 .await
643 });
644
645 let b2 = Arc::clone(&barrier);
646 let task2 = tokio::spawn(async move {
647 WORKFLOW_DEPTH
648 .scope(Cell::new(99), async {
649 b2.wait().await;
651 tokio::time::sleep(Duration::from_millis(10)).await;
652 current_depth()
653 })
654 .await
655 });
656
657 let (result1, result2) = tokio::join!(task1, task2);
658 assert_eq!(result1.unwrap(), 1, "Task 1 should have depth 1");
659 assert_eq!(result2.unwrap(), 99, "Task 2 should have depth 99");
660 }
661
662 #[tokio::test]
663 async fn test_depth_nested_scopes() {
664 let inner_depth = WORKFLOW_DEPTH
666 .scope(Cell::new(1), async {
667 let outer = current_depth();
668 let inner = WORKFLOW_DEPTH
669 .scope(Cell::new(2), async { current_depth() })
670 .await;
671 (outer, inner)
672 })
673 .await;
674
675 assert_eq!(inner_depth.0, 1, "Outer scope should have depth 1");
676 assert_eq!(inner_depth.1, 2, "Inner scope should have depth 2");
677 }
678
679 #[tokio::test]
680 async fn test_depth_cleanup_on_panic() {
681 use std::panic::AssertUnwindSafe;
682
683 let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
685 tokio::runtime::Handle::current().block_on(async {
686 let _ = WORKFLOW_DEPTH
687 .scope(Cell::new(42), async {
688 panic!("test panic");
689 })
690 .await;
691 });
692 }));
693
694 assert!(result.is_err(), "Should have panicked");
695 let depth = current_depth();
697 assert_eq!(depth, 0, "Depth should be 0 after panic cleanup");
698 }
699
700 #[tokio::test]
705 async fn test_timeout_clamped_to_max() {
706 use std::io::Write;
707 use tempfile::NamedTempFile;
708
709 let mut temp_file = NamedTempFile::with_suffix(".nika.yaml").unwrap();
711 writeln!(
712 temp_file,
713 r#"schema: nika/workflow@0.12
714workflow: test-timeout-clamp
715tasks:
716 - id: test
717 exec: "echo done""#
718 )
719 .unwrap();
720
721 let tool = RunTool;
722 let result = tool
724 .call(format!(
725 r#"{{"workflow": "{}", "timeout_secs": 99999}}"#,
726 temp_file.path().display()
727 ))
728 .await;
729
730 assert!(result.is_ok(), "Should succeed with clamped timeout");
732 }
733
734 #[test]
739 fn test_run_params_get_context_from_context_field() {
740 let params = RunParams {
741 workflow: "test.nika.yaml".to_string(),
742 context_json: None,
743 context: Some(serde_json::json!({"key": "value"})),
744 timeout_secs: 300,
745 max_depth: 3,
746 };
747
748 let context = params.get_context().unwrap();
749 assert!(context.is_some());
750 assert_eq!(context.unwrap()["key"], "value");
751 }
752
753 #[test]
754 fn test_run_params_get_context_from_context_json() {
755 let params = RunParams {
756 workflow: "test.nika.yaml".to_string(),
757 context_json: Some(r#"{"from": "json"}"#.to_string()),
758 context: None,
759 timeout_secs: 300,
760 max_depth: 3,
761 };
762
763 let context = params.get_context().unwrap();
764 assert!(context.is_some());
765 assert_eq!(context.unwrap()["from"], "json");
766 }
767
768 #[test]
769 fn test_run_params_get_context_json_priority() {
770 let params = RunParams {
772 workflow: "test.nika.yaml".to_string(),
773 context_json: Some(r#"{"priority": "json"}"#.to_string()),
774 context: Some(serde_json::json!({"priority": "object"})),
775 timeout_secs: 300,
776 max_depth: 3,
777 };
778
779 let context = params.get_context().unwrap();
780 assert!(context.is_some());
781 assert_eq!(context.unwrap()["priority"], "json");
782 }
783
784 #[test]
785 fn test_run_params_get_context_invalid_json_errors() {
786 let params = RunParams {
787 workflow: "test.nika.yaml".to_string(),
788 context_json: Some("not valid json".to_string()),
789 context: None,
790 timeout_secs: 300,
791 max_depth: 3,
792 };
793
794 let result = params.get_context();
795 assert!(result.is_err());
796 assert!(result
797 .unwrap_err()
798 .to_string()
799 .contains("Invalid context_json"));
800 }
801
802 #[test]
803 fn test_run_params_get_context_none_when_both_empty() {
804 let params = RunParams {
805 workflow: "test.nika.yaml".to_string(),
806 context_json: None,
807 context: None,
808 timeout_secs: 300,
809 max_depth: 3,
810 };
811
812 let context = params.get_context().unwrap();
813 assert!(context.is_none());
814 }
815}