Skip to main content

nika_engine/runtime/executor/
decompose.rs

1//! Decompose expansion strategies
2//!
3//! Expands decompose specs into iteration items via:
4//! - Semantic: MCP walk (novanet_search walk mode)
5//! - Static: Binding resolution (no MCP call)
6//! - Nested: Recursive BFS traversal via MCP
7
8use tracing::{debug, instrument};
9
10use crate::ast::decompose::{DecomposeSpec, DecomposeStrategy};
11use crate::binding::ResolvedBindings;
12use crate::error::NikaError;
13use crate::store::RunContext;
14
15use super::TaskExecutor;
16
17impl TaskExecutor {
18    /// Expand a decompose spec into iteration items
19    ///
20    /// Returns an array of JSON values that can be used as for_each items.
21    /// Supports semantic (MCP traverse), static (binding resolution), and nested strategies.
22    #[instrument(name = "expand_decompose", skip(self, bindings, datastore), fields(
23        strategy = ?spec.strategy,
24        traverse = %spec.traverse,
25        source = %spec.source
26    ))]
27    pub async fn expand_decompose(
28        &self,
29        spec: &DecomposeSpec,
30        bindings: &ResolvedBindings,
31        datastore: &RunContext,
32    ) -> Result<Vec<serde_json::Value>, NikaError> {
33        match spec.strategy {
34            DecomposeStrategy::Semantic => {
35                self.expand_decompose_semantic(spec, bindings, datastore)
36                    .await
37            }
38            DecomposeStrategy::Static => self.expand_decompose_static(spec, bindings, datastore),
39            DecomposeStrategy::Nested => {
40                self.expand_decompose_nested(spec, bindings, datastore)
41                    .await
42            }
43        }
44    }
45
46    /// Expand using semantic traversal via MCP (calls novanet_search walk mode)
47    async fn expand_decompose_semantic(
48        &self,
49        spec: &DecomposeSpec,
50        bindings: &ResolvedBindings,
51        datastore: &RunContext,
52    ) -> Result<Vec<serde_json::Value>, NikaError> {
53        use serde_json::{json, Value};
54
55        // Get MCP client
56        let server_name = spec.mcp_server();
57        let client = self.get_mcp_client(server_name).await?;
58
59        // Resolve source binding
60        let source_value = self.resolve_decompose_source(&spec.source, bindings, datastore)?;
61        let source_key = self.extract_decompose_key(&source_value)?;
62
63        debug!(
64            source_key = %source_key,
65            arc = %spec.traverse,
66            "Calling novanet_search (walk) for decompose"
67        );
68
69        // Call novanet_search with walk mode (replaces novanet_traverse)
70        let params = json!({
71            "mode": "walk",
72            "start_key": source_key,
73            "arc_kinds": [spec.traverse],
74            "direction": "outgoing"
75        });
76
77        let result = client.call_tool("novanet_search", params).await?;
78
79        // Parse JSON from result content
80        let result_json: Value =
81            serde_json::from_str(&result.text()).map_err(|e| NikaError::McpInvalidResponse {
82                tool: "novanet_search".to_string(),
83                reason: format!("failed to parse JSON response: {}", e),
84            })?;
85
86        // Extract nodes from result (pass ownership to avoid clone)
87        let mut items = self.extract_decompose_nodes(result_json)?;
88
89        // Apply max_items limit
90        if let Some(max) = spec.max_items {
91            items.truncate(max);
92        }
93
94        debug!(
95            count = items.len(),
96            max_items = ?spec.max_items,
97            "Decompose expanded to items"
98        );
99
100        Ok(items)
101    }
102
103    /// Expand using static binding resolution (no MCP call)
104    fn expand_decompose_static(
105        &self,
106        spec: &DecomposeSpec,
107        bindings: &ResolvedBindings,
108        datastore: &RunContext,
109    ) -> Result<Vec<serde_json::Value>, NikaError> {
110        let source_value = self.resolve_decompose_source(&spec.source, bindings, datastore)?;
111
112        // Expect array
113        let items = source_value
114            .as_array()
115            .ok_or_else(|| NikaError::BindingTypeMismatch {
116                expected: "array".to_string(),
117                actual: self.json_type_name(&source_value),
118                path: spec.source.clone(),
119            })?
120            .clone();
121
122        // Apply max_items limit
123        let mut items = items;
124        if let Some(max) = spec.max_items {
125            items.truncate(max);
126        }
127
128        Ok(items)
129    }
130
131    /// Expand using nested recursive traversal via MCP
132    ///
133    /// Recursively follows arcs until max_depth or no more children.
134    /// Uses BFS to collect all descendant nodes (excluding root) into a flat array.
135    async fn expand_decompose_nested(
136        &self,
137        spec: &DecomposeSpec,
138        bindings: &ResolvedBindings,
139        datastore: &RunContext,
140    ) -> Result<Vec<serde_json::Value>, NikaError> {
141        use serde_json::{json, Value};
142        use std::collections::HashSet;
143
144        // Get MCP client
145        let server_name = spec.mcp_server();
146        let client = self.get_mcp_client(server_name).await?;
147
148        // Resolve source binding
149        let source_value = self.resolve_decompose_source(&spec.source, bindings, datastore)?;
150        let root_key = self.extract_decompose_key(&source_value)?;
151
152        // Defaults for nested traversal
153        let max_depth = spec.max_depth.unwrap_or(3);
154        let max_items = spec.max_items.unwrap_or(100); // Safety limit
155
156        debug!(
157            root_key = %root_key,
158            arc = %spec.traverse,
159            max_depth = max_depth,
160            max_items = max_items,
161            "Starting nested decompose traversal"
162        );
163
164        // BFS traversal to collect all descendant nodes
165        let mut items: Vec<Value> = Vec::new();
166        let mut visited: HashSet<String> = HashSet::new();
167        let mut queue: Vec<(String, usize)> = vec![(root_key.clone(), 0)];
168
169        visited.insert(root_key.clone());
170
171        while let Some((current_key, depth)) = queue.pop() {
172            // Stop if we've reached max depth
173            if depth >= max_depth {
174                continue;
175            }
176
177            // Stop if we have enough items
178            if items.len() >= max_items {
179                break;
180            }
181
182            // Call novanet_search (walk mode) for current node
183            let params = json!({
184                "mode": "walk",
185                "start_key": current_key,
186                "arc_kinds": [spec.traverse],
187                "direction": "outgoing"
188            });
189
190            let result = match client.call_tool("novanet_search", params).await {
191                Ok(r) => r,
192                Err(e) => {
193                    debug!(key = %current_key, error = %e, "Traverse failed, skipping node");
194                    continue;
195                }
196            };
197
198            // Parse result
199            let result_json: Value = match serde_json::from_str(&result.text()) {
200                Ok(v) => v,
201                Err(e) => {
202                    debug!(key = %current_key, error = %e, "Failed to parse traverse result");
203                    continue;
204                }
205            };
206
207            // Extract child nodes (pass ownership to avoid clone)
208            let children = match self.extract_decompose_nodes(result_json) {
209                Ok(c) => c,
210                Err(_) => continue,
211            };
212
213            for child in children {
214                // Get child key for tracking
215                let child_key = match self.extract_decompose_key(&child) {
216                    Ok(k) => k,
217                    Err(_) => continue,
218                };
219
220                // Skip if already visited (avoid cycles)
221                if visited.contains(&child_key) {
222                    continue;
223                }
224
225                visited.insert(child_key.clone());
226                items.push(child);
227
228                // Add to queue for further traversal
229                queue.push((child_key, depth + 1));
230
231                // Early exit if we have enough items
232                if items.len() >= max_items {
233                    break;
234                }
235            }
236        }
237
238        debug!(
239            count = items.len(),
240            visited = visited.len(),
241            "Nested decompose completed"
242        );
243
244        Ok(items)
245    }
246
247    /// Resolve source binding expression for decompose
248    pub(super) fn resolve_decompose_source(
249        &self,
250        source: &str,
251        bindings: &ResolvedBindings,
252        datastore: &RunContext,
253    ) -> Result<serde_json::Value, NikaError> {
254        if source.starts_with("{{with.") && source.ends_with("}}") {
255            // Template syntax: {{with.alias}} - supports lazy bindings
256            let alias = &source[7..source.len() - 2];
257            bindings.get_resolved(alias, datastore)
258        } else if let Some(alias) = source.strip_prefix('$') {
259            if alias.contains('.') {
260                // Path syntax: $task.field
261                datastore
262                    .resolve_path(alias)
263                    .ok_or_else(|| NikaError::BindingNotFound {
264                        alias: alias.to_string(),
265                    })
266            } else {
267                // Simple alias - supports lazy bindings
268                bindings.get_resolved(alias, datastore)
269            }
270        } else {
271            // Literal value
272            Ok(serde_json::Value::String(source.to_string()))
273        }
274    }
275
276    /// Extract key from source value (string or object with 'key' field)
277    pub(super) fn extract_decompose_key(
278        &self,
279        value: &serde_json::Value,
280    ) -> Result<String, NikaError> {
281        match value {
282            serde_json::Value::String(s) => Ok(s.clone()),
283            serde_json::Value::Object(obj) => obj
284                .get("key")
285                .and_then(|v| v.as_str())
286                .map(|s| s.to_string())
287                .ok_or_else(|| NikaError::BindingTypeMismatch {
288                    expected: "string or object with 'key'".to_string(),
289                    actual: "object without 'key'".to_string(),
290                    path: "decompose.source".to_string(),
291                }),
292            _ => Err(NikaError::BindingTypeMismatch {
293                expected: "string or object".to_string(),
294                actual: self.json_type_name(value),
295                path: "decompose.source".to_string(),
296            }),
297        }
298    }
299
300    /// Extract nodes array from novanet_search walk result
301    /// Extract nodes from decompose result, taking ownership to avoid cloning
302    /// PERF: Takes ownership of Value to avoid cloning arrays
303    pub(super) fn extract_decompose_nodes(
304        &self,
305        result: serde_json::Value,
306    ) -> Result<Vec<serde_json::Value>, NikaError> {
307        // Try to extract from object fields first (no clone needed)
308        if let serde_json::Value::Object(mut map) = result {
309            if let Some(serde_json::Value::Array(nodes)) = map.remove("nodes") {
310                return Ok(nodes);
311            }
312            if let Some(serde_json::Value::Array(items)) = map.remove("items") {
313                return Ok(items);
314            }
315            if let Some(serde_json::Value::Array(results)) = map.remove("results") {
316                return Ok(results);
317            }
318        // Handle direct array case
319        } else if let serde_json::Value::Array(arr) = result {
320            return Ok(arr);
321        }
322        Err(NikaError::McpInvalidResponse {
323            tool: "novanet_search".to_string(),
324            reason: "expected nodes/items/results array in response".to_string(),
325        })
326    }
327
328    /// Get JSON type name for error messages
329    pub(super) fn json_type_name(&self, value: &serde_json::Value) -> String {
330        match value {
331            serde_json::Value::Null => "null",
332            serde_json::Value::Bool(_) => "boolean",
333            serde_json::Value::Number(_) => "number",
334            serde_json::Value::String(_) => "string",
335            serde_json::Value::Array(_) => "array",
336            serde_json::Value::Object(_) => "object",
337        }
338        .to_string()
339    }
340}