nika-engine 0.47.0

Nika workflow engine — embeddable runtime, provider, DAG, and binding logic
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
//! Task Executor - individual task execution
//!
//! Handles execution of individual tasks: infer, exec, fetch, invoke, agent.
//! Uses DashMap for lock-free provider caching.
//!
//! ## Module Organization
//! - `mod.rs`: TaskExecutor struct, constructors, dispatch, shared helpers
//! - `verbs.rs`: Shared helper functions (estimate_tokens, coerce_json_types, etc.)
//! - `infer.rs`: `run_infer` + `run_infer_vision` + guardrails
//! - `exec.rs`: `run_exec` (shell command execution)
//! - `fetch.rs`: `run_fetch` (HTTP requests)
//! - `invoke.rs`: `run_invoke` (MCP tool calls / resource reads)
//! - `agent.rs`: `run_agent` (multi-turn agentic loops)
//! - `decompose.rs`: Decompose expansion strategies (semantic, static, nested)

mod agent;
mod decompose;
mod exec;
mod extract;
mod fetch;
mod infer;
mod invoke;
#[cfg(test)]
mod tests;
#[cfg(test)]
mod tests_extract_e2e;
#[cfg(test)]
mod tests_extraction_e2e;
#[cfg(test)]
mod tests_wiremock;
mod verbs;

use parking_lot::RwLock;
use rustc_hash::FxHashMap;
use std::sync::Arc;

use dashmap::DashMap;
use tokio_util::sync::CancellationToken;
use tracing::{debug, instrument};

use crate::ast::output::{OutputFormat, OutputPolicy, SchemaRef};
use crate::ast::{McpConfigInline, TaskAction};
use crate::binding::ResolvedBindings;
use crate::error::NikaError;
use crate::event::{EventKind, EventLog};
use crate::mcp::{McpClient, McpClientPool};
use crate::media::CasStore;
use crate::provider::rig::RigProvider;
use crate::runtime::boot::PolicyConfig;
use crate::runtime::builtin::media::context::MediaToolContext;
use crate::runtime::policy::PolicyEnforcer;
use crate::runtime::BuiltinToolRouter;
use crate::runtime::SkillInjector;
use crate::store::RunContext;
use crate::tools::{PermissionMode, ToolContext};
use crate::util::{CONNECT_TIMEOUT, FETCH_TIMEOUT, REDIRECT_LIMIT};

/// Task executor with cached providers, shared HTTP client, and event logging
#[derive(Clone)]
pub struct TaskExecutor {
    /// Shared HTTP client (connection pooling)
    http_client: reqwest::Client,
    /// Cached rig-core providers
    rig_provider_cache: Arc<DashMap<String, RigProvider>>,
    /// Centralized MCP client pool
    ///
    /// Replaces the previous `mcp_client_cache` + `mcp_configs` pair.
    /// Handles lazy initialization, per-server deduplication via DashMap + OnceCell,
    /// and graceful shutdown. Shared across TaskExecutor, TUI App, and ChatAgent.
    mcp_pool: McpClientPool,
    /// Default provider name
    default_provider: Arc<str>,
    /// Default model
    default_model: Option<Arc<str>>,
    /// Event log for fine-grained audit trail
    event_log: EventLog,
    /// Router for builtin nika:* tools
    builtin_router: Arc<BuiltinToolRouter>,
    /// Policy enforcer for security checks
    policy_enforcer: Arc<parking_lot::RwLock<PolicyEnforcer>>,
    /// Cancellation token for aborting in-flight operations
    ///
    /// When cancelled, MCP invoke operations race against this token
    /// so they can abort promptly instead of waiting for INVOKE_TASK_DEADLINE.
    cancel_token: CancellationToken,
    /// CAS store for reading media blobs (used by vision content resolution)
    cas: Arc<CasStore>,
    /// Tool context for setting permission mode after construction
    tool_ctx: Arc<ToolContext>,
    /// Shared SkillInjector for loading and caching skill files
    skill_injector: Arc<SkillInjector>,
    /// Workflow-level skills mapping (alias -> file path)
    skills_map: std::collections::HashMap<String, String>,
    /// Base directory for resolving relative skill paths
    workflow_base_dir: std::path::PathBuf,
}

impl TaskExecutor {
    /// Create a new executor with default provider, model, MCP configs, and event log
    pub fn new(
        provider: &str,
        model: Option<&str>,
        mcp_configs: Option<FxHashMap<String, McpConfigInline>>,
        event_log: EventLog,
    ) -> Result<Self, NikaError> {
        Self::with_policy(provider, model, mcp_configs, event_log, None, None)
    }

    /// Create a new executor with explicit policy configuration.
    ///
    /// Returns an error if the media compute pool cannot be created.
    pub fn with_policy(
        provider: &str,
        model: Option<&str>,
        mcp_configs: Option<FxHashMap<String, McpConfigInline>>,
        event_log: EventLog,
        policy_config: Option<PolicyConfig>,
        permission_mode: Option<PermissionMode>,
    ) -> Result<Self, NikaError> {
        // SAFETY: ClientBuilder::build() only fails with custom TLS or proxy config.
        // We use defaults, so this is effectively infallible.
        //
        // Custom redirect policy: check each hop against SSRF blocklist to prevent
        // SSRF bypass via HTTP redirect (e.g., external → 169.254.169.254).
        let ssrf_redirect_policy = reqwest::redirect::Policy::custom(|attempt| {
            use crate::runtime::policy::is_ssrf_blocked;

            if attempt.previous().len() >= REDIRECT_LIMIT {
                attempt.stop()
            } else {
                let blocked = attempt.url().host_str().and_then(|host| {
                    let h = host.to_lowercase();
                    let h_normalized = h.trim_start_matches('[').trim_end_matches(']');
                    if is_ssrf_blocked(h_normalized) {
                        Some(h)
                    } else {
                        None
                    }
                });
                if let Some(host) = blocked {
                    attempt.error(std::io::Error::new(
                        std::io::ErrorKind::PermissionDenied,
                        format!("SSRF protection: redirect to '{}' blocked", host),
                    ))
                } else {
                    attempt.follow()
                }
            }
        });
        let http_client = reqwest::Client::builder()
            .timeout(FETCH_TIMEOUT)
            .connect_timeout(CONNECT_TIMEOUT)
            .redirect(ssrf_redirect_policy)
            .user_agent(format!("nika/{}", env!("CARGO_PKG_VERSION")))
            .build()
            .expect("HTTP client build with default TLS is infallible");

        let policy_enforcer = PolicyEnforcer::new(policy_config.unwrap_or_default());

        // Create ToolContext for file tools
        let working_dir = std::env::current_dir().unwrap_or_else(|_| {
            tracing::warn!("Failed to get current directory, using /tmp");
            std::path::PathBuf::from("/tmp")
        });
        let perm = permission_mode.unwrap_or(PermissionMode::Plan);
        tracing::debug!(?perm, "File tools using PermissionMode");
        let tool_ctx = Arc::new(ToolContext::new(working_dir.clone(), perm));

        // Create media tool context with CAS store at workspace default
        let media_ctx = Arc::new(MediaToolContext::new(CasStore::workspace_default(
            &working_dir,
        ))?);
        // Separate CAS handle for vision content resolution (same directory)
        let cas = Arc::new(CasStore::workspace_default(&working_dir));

        Ok(Self {
            http_client,
            rig_provider_cache: Arc::new(DashMap::new()),
            mcp_pool: McpClientPool::with_configs(
                event_log.clone(),
                mcp_configs.unwrap_or_default(),
            ),
            default_provider: provider.into(),
            default_model: model.map(Into::into),
            event_log,
            builtin_router: Arc::new(BuiltinToolRouter::with_all_tools(
                tool_ctx.clone(),
                media_ctx,
            )),
            policy_enforcer: Arc::new(RwLock::new(policy_enforcer)),
            cancel_token: CancellationToken::new(),
            cas,
            tool_ctx,
            skill_injector: Arc::new(SkillInjector::new()),
            skills_map: std::collections::HashMap::new(),
            workflow_base_dir: working_dir,
        })
    }

    /// Set the permission mode for file tools (nika:write, nika:edit, etc.)
    pub fn set_permission_mode(&self, mode: PermissionMode) {
        self.tool_ctx.set_permission_mode(mode);
    }

    /// Set a cancellation token for aborting in-flight operations.
    ///
    /// When the token is cancelled, MCP invoke operations will abort promptly
    /// instead of waiting for the full INVOKE_TASK_DEADLINE timeout.
    pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
        self.cancel_token = token;
        self
    }

    /// Check if the executor has been cancelled.
    pub fn is_cancelled(&self) -> bool {
        self.cancel_token.is_cancelled()
    }

    /// Set the workflow-level skills mapping for agent skill injection.
    ///
    /// When set, agents with `skills:` configured will have skill content
    /// loaded and prepended to their system prompts via `SkillInjector`.
    pub fn with_skills(
        mut self,
        skills_map: std::collections::HashMap<String, String>,
        base_dir: std::path::PathBuf,
    ) -> Self {
        self.skills_map = skills_map;
        self.workflow_base_dir = base_dir;
        self
    }

    /// Inject a mock MCP client for testing
    ///
    /// This allows tests to use mock clients without relying on automatic fallback.
    /// Call this after creating the executor but before executing invoke actions.
    #[cfg(test)]
    pub fn inject_mock_mcp_client(&self, name: &str) {
        self.mcp_pool
            .inject_mock(name, Arc::new(McpClient::mock(name)));
    }

    /// Build JSON schema instruction for LLM prompts
    ///
    /// When output policy requires JSON format with a schema, this generates
    /// an instruction string to append to the prompt, telling the LLM to
    /// output valid JSON conforming to the schema.
    /// Build JSON schema instruction for LLM prompt injection.
    ///
    /// `cached_example` is used for file-based `from_example` — the caller pre-reads
    /// the file asynchronously and passes the parsed value here for synchronous injection.
    pub(super) fn build_json_schema_instruction(
        output_policy: Option<&OutputPolicy>,
        cached_example: Option<&serde_json::Value>,
    ) -> Option<String> {
        let policy = output_policy?;
        if policy.format != OutputFormat::Json {
            return None;
        }

        // from_example: inject example structure or generic instruction.
        match policy.from_example.as_ref() {
            Some(SchemaRef::Inline(ref example)) => {
                return Self::format_example_instruction(example);
            }
            Some(SchemaRef::File(_)) => {
                // File-based: use cached_example if pre-loaded, otherwise generic instruction.
                if let Some(example) = cached_example {
                    return Self::format_example_instruction(example);
                }
                return Some(
                    "\n\n---\n\
                     CRITICAL OUTPUT REQUIREMENT:\n\
                     Your response MUST be valid JSON.\n\n\
                     Rules:\n\
                     - Output ONLY the JSON object, no additional text\n\
                     - Do NOT wrap in markdown code blocks (no ```json)\n\
                     - Ensure all JSON is properly formatted and valid"
                        .to_string(),
                );
            }
            None => {} // no from_example — fall through to schema-based injection below
        }

        let schema_ref = policy.schema.as_ref()?;
        let schema_json = match schema_ref {
            SchemaRef::Inline(v) => v.clone(),
            SchemaRef::File(_) => {
                return Some(
                    "\n\n---\n\
                     CRITICAL OUTPUT REQUIREMENT:\n\
                     Your response MUST be valid JSON.\n\n\
                     Rules:\n\
                     - Output ONLY the JSON object, no additional text\n\
                     - Do NOT wrap in markdown code blocks (no ```json)\n\
                     - Ensure all JSON is properly formatted and valid"
                        .to_string(),
                );
            }
        };
        let schema_str = serde_json::to_string_pretty(&schema_json).unwrap_or_default();
        Some(format!(
            "\n\n---\n\
             CRITICAL OUTPUT REQUIREMENT:\n\
             Your response MUST be valid JSON that conforms to this schema:\n\n\
             ```json\n{}\n```\n\n\
             Rules:\n\
             - Output ONLY the JSON object, no additional text before or after\n\
             - Do NOT wrap your response in markdown code blocks (no ```json)\n\
             - All required fields must be present\n\
             - Field types must match the schema exactly",
            schema_str
        ))
    }

    /// Format an example JSON value into a prompt injection instruction.
    fn format_example_instruction(example: &serde_json::Value) -> Option<String> {
        let example_str = match serde_json::to_string_pretty(example) {
            Ok(s) => s,
            Err(e) => {
                tracing::warn!(
                    "Failed to serialize from_example for prompt injection: {}",
                    e
                );
                return None;
            }
        };
        Some(format!(
            "\n\n---\n\
             CRITICAL OUTPUT REQUIREMENT:\n\
             Your response MUST be valid JSON matching this exact structure:\n\n\
             ```json\n{}\n```\n\n\
             Rules:\n\
             - Output ONLY the JSON object, no additional text\n\
             - Do NOT wrap in markdown code blocks (no ```json)\n\
             - All keys shown above must be present\n\
             - Value types must match (strings, numbers, arrays, objects)",
            example_str
        ))
    }

    /// Run a task action with the given bindings
    ///
    /// The datastore is required for resolving lazy bindings during template substitution.
    /// The output_policy is used to inject JSON schema instructions into prompts for infer/agent.
    #[instrument(skip(self, bindings, datastore, output_policy), fields(action_type = %action_type(action)))]
    pub async fn execute(
        &self,
        task_id: &Arc<str>,
        action: &TaskAction,
        bindings: &ResolvedBindings,
        datastore: &RunContext,
        output_policy: Option<&OutputPolicy>,
    ) -> Result<String, NikaError> {
        debug!("Running task action");
        match action {
            TaskAction::Infer { infer } => {
                self.run_infer(task_id, infer, bindings, datastore, output_policy)
                    .await
            }
            TaskAction::Exec { exec: e } => self.run_exec(task_id, e, bindings, datastore).await,
            TaskAction::Fetch { fetch } => {
                self.run_fetch(task_id, fetch, bindings, datastore).await
            }
            TaskAction::Invoke { invoke } => {
                self.run_invoke(task_id, invoke, bindings, datastore).await
            }
            TaskAction::Agent { agent } => {
                self.run_agent(task_id, agent, bindings, datastore, output_policy)
                    .await
            }
        }
    }

    /// Get or create a cached rig-core provider.
    ///
    /// Resolves provider names and aliases via [`RigProvider::from_name()`],
    /// which uses `core::find_provider()` as the single source of truth.
    pub(super) fn get_rig_provider(&self, name: &str) -> Result<RigProvider, NikaError> {
        use dashmap::mapref::entry::Entry;

        // Normalize provider name so aliases ("claude") and canonical ("anthropic")
        // share the same cache entry, avoiding double-instantiation.
        let canonical = crate::core::find_provider(name)
            .map(|p| p.id)
            .unwrap_or(name);

        match self.rig_provider_cache.entry(canonical.to_string()) {
            Entry::Occupied(e) => Ok(e.get().clone()),
            Entry::Vacant(e) => {
                let provider = RigProvider::from_name(name)?;
                e.insert(provider.clone());
                // EMIT: ProviderInitialized (cache miss — first use)
                self.event_log.emit(EventKind::ProviderInitialized {
                    provider: canonical.to_string(),
                    model: provider.default_model().to_string(),
                    cached: false,
                });
                Ok(provider)
            }
        }
    }

    /// Get the default provider name.
    pub fn default_provider(&self) -> &str {
        &self.default_provider
    }

    /// Get or create an MCP client for a named server
    ///
    /// Uses OnceCell per server to ensure thread-safe initialization.
    /// Even with concurrent for_each iterations, only one client is created per server.
    ///
    /// Delegates to [`McpClientPool::get_or_connect`] which handles lazy initialization,
    /// per-server deduplication via DashMap + OnceCell, and event logging.
    pub(super) async fn get_mcp_client(&self, name: &str) -> Result<Arc<McpClient>, NikaError> {
        self.mcp_pool.get_or_connect(name).await.map_err(Into::into)
    }

    /// Gracefully shut down all MCP server connections.
    ///
    /// Delegates to [`McpClientPool::shutdown_all`] which terminates server
    /// processes and marks the pool as shut down. Idempotent.
    pub async fn shutdown_mcp(&self) {
        self.mcp_pool.shutdown_all().await;
    }
}

/// Get action type as string for tracing
pub(super) fn action_type(action: &TaskAction) -> &'static str {
    match action {
        TaskAction::Infer { .. } => "infer",
        TaskAction::Exec { .. } => "exec",
        TaskAction::Fetch { .. } => "fetch",
        TaskAction::Invoke { .. } => "invoke",
        TaskAction::Agent { .. } => "agent",
    }
}