1use rmcp::handler::server::tool::ToolRouter;
7use rmcp::handler::server::wrapper::Parameters;
8use rmcp::model::{CallToolResult, Content, ServerCapabilities, ServerInfo};
9use rmcp::{tool, tool_handler, tool_router, ServerHandler};
10use schemars::JsonSchema;
11use serde::Deserialize;
12use tokio::process::Command as TokioCommand;
13use tokio::time::{timeout, Duration};
14
15#[derive(Clone)]
17pub struct NikaMcpServer {
18 tool_router: ToolRouter<Self>,
19}
20
21impl Default for NikaMcpServer {
22 fn default() -> Self {
23 Self::new()
24 }
25}
26
27#[derive(Debug, Deserialize, JsonSchema)]
29pub struct CheckParams {
30 pub path: String,
32}
33
34#[derive(Debug, Deserialize, JsonSchema)]
36pub struct SchemaParams {
37 #[serde(default = "default_schema_version")]
39 pub version: String,
40}
41
42fn default_schema_version() -> String {
43 "0.12".to_string()
44}
45
46#[derive(Debug, Deserialize, JsonSchema)]
48pub struct ErrorLookupParams {
49 pub code: String,
51}
52
53#[tool_router]
54impl NikaMcpServer {
55 pub fn new() -> Self {
56 Self {
57 tool_router: Self::tool_router(),
58 }
59 }
60
61 #[tool(
63 name = "nika_check",
64 description = "Validate a Nika .nika.yaml workflow file. Returns validation errors with NIKA-XXX codes if invalid, or confirmation if valid. Use when editing or creating .nika.yaml files."
65 )]
66 async fn check(
67 &self,
68 Parameters(params): Parameters<CheckParams>,
69 ) -> Result<CallToolResult, rmcp::ErrorData> {
70 let canonical = match validate_workflow_path(¶ms.path) {
72 Ok(p) => p,
73 Err(e) => {
74 return Ok(CallToolResult::error(vec![Content::text(e)]));
75 }
76 };
77
78 let path_str = canonical.to_string_lossy().to_string();
79 let output = timeout(
80 Duration::from_secs(30),
81 TokioCommand::new("nika")
82 .args(["check", &path_str])
83 .output(),
84 )
85 .await
86 .map_err(|_| rmcp::ErrorData::internal_error("nika check timed out after 30s", None))?
87 .map_err(|e| {
88 rmcp::ErrorData::internal_error(format!("Failed to run nika check: {}", e), None)
89 })?;
90
91 let stdout = String::from_utf8_lossy(&output.stdout);
92 let stderr = String::from_utf8_lossy(&output.stderr);
93 if output.status.success() {
94 Ok(CallToolResult::success(vec![Content::text(format!(
95 "Valid: {}",
96 params.path
97 ))]))
98 } else {
99 Ok(CallToolResult::error(vec![Content::text(format!(
100 "Validation errors:\n{}\n{}",
101 stdout, stderr
102 ))]))
103 }
104 }
105
106 #[tool(
108 name = "nika_list_workflows",
109 description = "List all .nika.yaml workflow files in the project. Use to discover available workflows."
110 )]
111 async fn list_workflows(&self) -> Result<CallToolResult, rmcp::ErrorData> {
112 let workflows = tokio::task::spawn_blocking(|| {
114 let mut wf = Vec::new();
115 collect_workflows(std::path::Path::new("."), &mut wf, 0);
116 wf
117 })
118 .await
119 .unwrap_or_default();
120
121 if workflows.is_empty() {
122 Ok(CallToolResult::success(vec![Content::text(
123 "No .nika.yaml files found in current directory.",
124 )]))
125 } else {
126 Ok(CallToolResult::success(vec![Content::text(format!(
127 "Found {} workflow(s):\n{}",
128 workflows.len(),
129 workflows.join("\n")
130 ))]))
131 }
132 }
133
134 #[tool(
136 name = "nika_schema",
137 description = "Get the Nika workflow YAML schema reference. Returns the 5 verbs, all fields, binding syntax, and transform catalog."
138 )]
139 async fn schema(
140 &self,
141 Parameters(_params): Parameters<SchemaParams>,
142 ) -> Result<CallToolResult, rmcp::ErrorData> {
143 Ok(CallToolResult::success(vec![Content::text(SCHEMA_REF)]))
144 }
145
146 #[tool(
148 name = "nika_error_lookup",
149 description = "Look up a NIKA-XXX error code. Returns the error description, category, and how to fix it. Use when debugging workflow validation or runtime errors."
150 )]
151 async fn error_lookup(
152 &self,
153 Parameters(params): Parameters<ErrorLookupParams>,
154 ) -> Result<CallToolResult, rmcp::ErrorData> {
155 let code = params.code.to_uppercase().replace("NIKA-", "");
156 let num: u32 = code.parse().unwrap_or(999);
157
158 let (category, description) = match num {
159 0..=9 => ("Workflow", "Workflow structure error (schema, tasks)"),
160 10..=19 => (
161 "Schema/Validation",
162 "Schema validation error (task IDs, fields)",
163 ),
164 20..=29 => ("DAG", "DAG error (circular deps, missing deps)"),
165 30..=39 => ("Provider", "Provider error (API key, model not found)"),
166 40..=49 => ("Template/Binding", "Template or binding resolution error"),
167 50..=59 => ("Path/Security", "Path, task, or security error"),
168 60..=69 => ("Output", "JSON/schema validation error"),
169 90..=99 => ("Execution", "Runtime execution error"),
170 100..=109 => ("MCP", "MCP server/tool error"),
171 110..=119 => ("Agent", "Agent loop error"),
172 200..=219 => ("File Tools", "Builtin file tool error"),
173 250..=259 => ("Media", "Media pipeline error"),
174 270..=279 => ("Skills", "Skill file not found or invalid"),
175 280..=289 => ("Artifacts", "Artifact write/path error"),
176 290..=297 => (
177 "Media Tools",
178 "Builtin media tool error (import, decode, thumbnail, etc.)",
179 ),
180 300..=309 => ("Structured Output", "Structured output error"),
181 _ => ("Unknown", "Unknown error code"),
182 };
183
184 Ok(CallToolResult::success(vec![Content::text(format!(
185 "NIKA-{:03}: {}\nCategory: {}\nFix: Run `nika check <file>` for detailed diagnostics.",
186 num, description, category
187 ))]))
188 }
189}
190
191#[tool_handler]
192impl ServerHandler for NikaMcpServer {
193 fn get_info(&self) -> ServerInfo {
194 ServerInfo {
195 instructions: Some(
196 "Nika workflow engine MCP server. Validate, list, and explore .nika.yaml workflows."
197 .into(),
198 ),
199 capabilities: ServerCapabilities::builder()
200 .enable_tools()
201 .build(),
202 ..Default::default()
203 }
204 }
205}
206
207pub async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
209 use rmcp::transport::stdio;
210 use rmcp::ServiceExt;
211
212 let handler = NikaMcpServer::new();
213 let server = handler.serve(stdio()).await?;
214 server.waiting().await?;
215 Ok(())
216}
217
218fn collect_workflows(dir: &std::path::Path, results: &mut Vec<String>, depth: usize) {
221 if depth > 5 || results.len() >= MAX_WORKFLOW_RESULTS {
222 return;
223 }
224 if let Ok(entries) = std::fs::read_dir(dir) {
225 for entry in entries.flatten() {
226 if results.len() >= MAX_WORKFLOW_RESULTS {
227 return;
228 }
229 let path = entry.path();
230 let is_dir = entry.file_type().map(|ft| ft.is_dir()).unwrap_or(false);
232 if is_dir {
233 let name = entry.file_name().to_string_lossy().to_string();
234 if !name.starts_with('.') && name != "target" && name != "node_modules" {
235 collect_workflows(&path, results, depth + 1);
236 }
237 } else if path
238 .file_name()
239 .unwrap_or_default()
240 .to_string_lossy()
241 .ends_with(".nika.yaml")
242 {
243 results.push(path.display().to_string());
244 }
245 }
246 }
247}
248
249const MAX_WORKFLOW_RESULTS: usize = 500;
251
252fn validate_workflow_path(user_path: &str) -> Result<std::path::PathBuf, String> {
254 if !user_path.ends_with(".nika.yaml") {
256 return Err("Only .nika.yaml files can be validated".to_string());
257 }
258
259 let cwd =
260 std::env::current_dir().map_err(|e| format!("Cannot determine working directory: {e}"))?;
261 let canonical_cwd = cwd
262 .canonicalize()
263 .map_err(|e| format!("Cannot canonicalize cwd: {e}"))?;
264
265 let requested = cwd.join(user_path);
266 let canonical = requested
267 .canonicalize()
268 .map_err(|_| format!("File not found: {user_path}"))?;
269
270 if !canonical.starts_with(&canonical_cwd) {
271 return Err(format!("Path traversal blocked: {user_path}"));
272 }
273
274 Ok(canonical)
275}
276
277const SCHEMA_REF: &str = r#"# Nika Workflow Schema (v0.12)
278
279## 5 Verbs
280- infer: { prompt, system, temperature, max_tokens, content, extended_thinking, thinking_budget, response_format, guardrails }
281- exec: { command, shell, cwd, env, timeout }
282- fetch: { url, method, headers, body/json, extract, selector, response, follow_redirects, timeout }
283- invoke: { tool, mcp, params, resource, timeout }
284- agent: { prompt, system, tools, mcp, max_turns, max_tokens, token_budget, from, skills, guardrails, completion, limits, extended_thinking }
285
286## Task Fields
287id, description, provider, model, preset, with, depends_on, output, for_each, as, concurrency, fail_fast, retry, timeout, structured, artifact, record, context_budget, routing, log, on_error, when
288
289## Workflow Fields
290schema, workflow, description, provider, model, inputs, context, include, mcp, agents, skills, artifacts, goal, orchestrate, log, tasks
291
292## Bindings
293with: { alias: $task_id } → {{with.alias}}
294Path: $task.data.field | Defaults: $task.path ?? "fallback" | Env: $env.API_KEY
295
296## 50 Transforms
297String: upper, lower, trim, trim_start, trim_end, length, to_string
298Array: first, last, flatten, reverse, sort, unique, compact, keys, values
299Numeric: to_number, round, abs, ceil, floor
300Type: to_bool, to_json, parse_json, type_of
301Parametric: join(sep), split(sep), default(val), slice(start, end)
302Query: pluck(field), where(field, val), pick(f1, f2), omit(f1, f2), sort_by(field), group_by(field), merge, regex(pattern)
303String test: starts_with(str), ends_with(str), contains(str)
304URL: url_host, url_path, url_without_query, url_normalize
305Encoding: base64_encode, base64_decode, content_hash, unique_urls
306JQ: jq(expr) — full jq stdlib via jaq-core
307System: shell (escape for shell: true commands)
308
309## Extract Modes (fetch:)
310markdown, article, text, selector, metadata, links, jsonpath, feed, llm_txt
311
312## 62 Builtin Tools (nika:*)
313Core (7): sleep, log, emit, assert, prompt, run, complete
314File (5): read, write, edit, glob, grep
315Introspection (6): dag_info, task_status, threads, orchestrate, cost, records
316Data (13): json_merge, set_diff, zip, map, filter, group_by, chunk, token_count, enrich, jq, tree_data, inject, json_query (deprecated → jq)
317Data Sprint 2 (6): json_verify, yaml_validate, locale_lookup, aggregate, json_flatten, json_unflatten
318Media always-on (5): import, decode, dimensions, thumbhash, dominant_color
319Media core (3): thumbnail, convert, strip
320Media opt-in (17): metadata, optimize, svg_render, chart, phash, compare, pdf_extract, provenance, verify, qr_validate, quality, html_to_md, css_select, extract_metadata, extract_links, readability, pipeline
321
322## Providers
323anthropic (claude), openai (gpt), mistral, groq, deepseek (deep-seek), gemini (google), xai (grok), native (local), mock
324Fallback: provider: [groq, claude, openai]
325"#;
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330
331 #[test]
334 fn test_rejects_non_nika_extension() {
335 let result = validate_workflow_path("/etc/passwd");
336 assert!(result.is_err());
337 assert!(
338 result.unwrap_err().contains("Only .nika.yaml"),
339 "Should reject non-.nika.yaml files"
340 );
341 }
342
343 #[test]
344 fn test_rejects_yaml_without_nika_prefix() {
345 let result = validate_workflow_path("workflow.yaml");
346 assert!(result.is_err());
347 assert!(result.unwrap_err().contains("Only .nika.yaml"));
348 }
349
350 #[test]
351 fn test_rejects_path_traversal() {
352 let result = validate_workflow_path("../../../etc/shadow.nika.yaml");
354 assert!(result.is_err());
355 }
357
358 #[test]
359 fn test_accepts_valid_nika_yaml_in_cwd() {
360 let tmpdir = tempfile::tempdir().unwrap();
362 let test_file = tmpdir.path().join("valid.nika.yaml");
363 std::fs::write(&test_file, "schema: nika/workflow@0.12\ntasks: []").unwrap();
364
365 let abs_path = test_file.to_string_lossy().to_string();
368 let result = validate_workflow_path(&abs_path);
370 assert!(
374 result.is_ok() || result.as_ref().unwrap_err().contains("Path traversal"),
375 "Should accept valid .nika.yaml or block traversal, got: {:?}",
376 result
377 );
378 }
379
380 #[test]
381 fn test_rejects_absolute_path_outside_cwd() {
382 let result = validate_workflow_path("/tmp/evil.nika.yaml");
383 assert!(result.is_err());
384 }
385
386 #[test]
389 fn test_collect_workflows_respects_max_results() {
390 let mut results = Vec::new();
391 collect_workflows(std::path::Path::new("."), &mut results, 0);
392 assert!(
393 results.len() <= MAX_WORKFLOW_RESULTS,
394 "Should not exceed {} results, got {}",
395 MAX_WORKFLOW_RESULTS,
396 results.len()
397 );
398 }
399
400 #[test]
401 fn test_collect_workflows_skips_hidden_dirs() {
402 let mut results = Vec::new();
403 collect_workflows(std::path::Path::new("."), &mut results, 0);
404 for r in &results {
405 assert!(
406 !r.contains("/."),
407 "Should not include files from hidden dirs: {}",
408 r
409 );
410 }
411 }
412
413 #[test]
414 fn test_collect_workflows_respects_depth_limit() {
415 let mut results = Vec::new();
416 collect_workflows(std::path::Path::new("."), &mut results, 6);
418 assert!(results.is_empty(), "Depth 6 should return no results");
419 }
420
421 #[tokio::test]
424 async fn test_error_lookup_returns_category() {
425 let server = NikaMcpServer::new();
426 let result = server
427 .error_lookup(Parameters(ErrorLookupParams {
428 code: "NIKA-040".to_string(),
429 }))
430 .await
431 .unwrap();
432 let text = format!("{:?}", result);
434 assert!(
435 text.contains("Template") || text.contains("Binding"),
436 "NIKA-040 should be Template/Binding category"
437 );
438 }
439
440 #[tokio::test]
441 async fn test_error_lookup_handles_invalid_code() {
442 let server = NikaMcpServer::new();
443 let result = server
444 .error_lookup(Parameters(ErrorLookupParams {
445 code: "not-a-code".to_string(),
446 }))
447 .await
448 .unwrap();
449 let text = format!("{:?}", result);
450 assert!(
451 text.contains("Unknown"),
452 "Invalid code should return Unknown"
453 );
454 }
455}