nika_engine/runtime/executor/
decompose.rs1use tracing::{debug, instrument};
9
10use crate::ast::decompose::{DecomposeSpec, DecomposeStrategy};
11use crate::binding::ResolvedBindings;
12use crate::error::NikaError;
13use crate::store::RunContext;
14
15use super::TaskExecutor;
16
17impl TaskExecutor {
18 #[instrument(name = "expand_decompose", skip(self, bindings, datastore), fields(
23 strategy = ?spec.strategy,
24 traverse = %spec.traverse,
25 source = %spec.source
26 ))]
27 pub async fn expand_decompose(
28 &self,
29 spec: &DecomposeSpec,
30 bindings: &ResolvedBindings,
31 datastore: &RunContext,
32 ) -> Result<Vec<serde_json::Value>, NikaError> {
33 match spec.strategy {
34 DecomposeStrategy::Semantic => {
35 self.expand_decompose_semantic(spec, bindings, datastore)
36 .await
37 }
38 DecomposeStrategy::Static => self.expand_decompose_static(spec, bindings, datastore),
39 DecomposeStrategy::Nested => {
40 self.expand_decompose_nested(spec, bindings, datastore)
41 .await
42 }
43 }
44 }
45
46 async fn expand_decompose_semantic(
48 &self,
49 spec: &DecomposeSpec,
50 bindings: &ResolvedBindings,
51 datastore: &RunContext,
52 ) -> Result<Vec<serde_json::Value>, NikaError> {
53 use serde_json::{json, Value};
54
55 let server_name = spec.mcp_server();
57 let client = self.get_mcp_client(server_name).await?;
58
59 let source_value = self.resolve_decompose_source(&spec.source, bindings, datastore)?;
61 let source_key = self.extract_decompose_key(&source_value)?;
62
63 debug!(
64 source_key = %source_key,
65 arc = %spec.traverse,
66 "Calling novanet_search (walk) for decompose"
67 );
68
69 let params = json!({
71 "mode": "walk",
72 "start_key": source_key,
73 "arc_kinds": [spec.traverse],
74 "direction": "outgoing"
75 });
76
77 let result = client.call_tool("novanet_search", params).await?;
78
79 let result_json: Value =
81 serde_json::from_str(&result.text()).map_err(|e| NikaError::McpInvalidResponse {
82 tool: "novanet_search".to_string(),
83 reason: format!("failed to parse JSON response: {}", e),
84 })?;
85
86 let mut items = self.extract_decompose_nodes(result_json)?;
88
89 if let Some(max) = spec.max_items {
91 items.truncate(max);
92 }
93
94 debug!(
95 count = items.len(),
96 max_items = ?spec.max_items,
97 "Decompose expanded to items"
98 );
99
100 Ok(items)
101 }
102
103 fn expand_decompose_static(
105 &self,
106 spec: &DecomposeSpec,
107 bindings: &ResolvedBindings,
108 datastore: &RunContext,
109 ) -> Result<Vec<serde_json::Value>, NikaError> {
110 let source_value = self.resolve_decompose_source(&spec.source, bindings, datastore)?;
111
112 let items = source_value
114 .as_array()
115 .ok_or_else(|| NikaError::BindingTypeMismatch {
116 expected: "array".to_string(),
117 actual: self.json_type_name(&source_value),
118 path: spec.source.clone(),
119 })?
120 .clone();
121
122 let mut items = items;
124 if let Some(max) = spec.max_items {
125 items.truncate(max);
126 }
127
128 Ok(items)
129 }
130
131 async fn expand_decompose_nested(
136 &self,
137 spec: &DecomposeSpec,
138 bindings: &ResolvedBindings,
139 datastore: &RunContext,
140 ) -> Result<Vec<serde_json::Value>, NikaError> {
141 use serde_json::{json, Value};
142 use std::collections::HashSet;
143
144 let server_name = spec.mcp_server();
146 let client = self.get_mcp_client(server_name).await?;
147
148 let source_value = self.resolve_decompose_source(&spec.source, bindings, datastore)?;
150 let root_key = self.extract_decompose_key(&source_value)?;
151
152 let max_depth = spec.max_depth.unwrap_or(3);
154 let max_items = spec.max_items.unwrap_or(100); debug!(
157 root_key = %root_key,
158 arc = %spec.traverse,
159 max_depth = max_depth,
160 max_items = max_items,
161 "Starting nested decompose traversal"
162 );
163
164 let mut items: Vec<Value> = Vec::new();
166 let mut visited: HashSet<String> = HashSet::new();
167 let mut queue: Vec<(String, usize)> = vec![(root_key.clone(), 0)];
168
169 visited.insert(root_key.clone());
170
171 while let Some((current_key, depth)) = queue.pop() {
172 if depth >= max_depth {
174 continue;
175 }
176
177 if items.len() >= max_items {
179 break;
180 }
181
182 let params = json!({
184 "mode": "walk",
185 "start_key": current_key,
186 "arc_kinds": [spec.traverse],
187 "direction": "outgoing"
188 });
189
190 let result = match client.call_tool("novanet_search", params).await {
191 Ok(r) => r,
192 Err(e) => {
193 debug!(key = %current_key, error = %e, "Traverse failed, skipping node");
194 continue;
195 }
196 };
197
198 let result_json: Value = match serde_json::from_str(&result.text()) {
200 Ok(v) => v,
201 Err(e) => {
202 debug!(key = %current_key, error = %e, "Failed to parse traverse result");
203 continue;
204 }
205 };
206
207 let children = match self.extract_decompose_nodes(result_json) {
209 Ok(c) => c,
210 Err(_) => continue,
211 };
212
213 for child in children {
214 let child_key = match self.extract_decompose_key(&child) {
216 Ok(k) => k,
217 Err(_) => continue,
218 };
219
220 if visited.contains(&child_key) {
222 continue;
223 }
224
225 visited.insert(child_key.clone());
226 items.push(child);
227
228 queue.push((child_key, depth + 1));
230
231 if items.len() >= max_items {
233 break;
234 }
235 }
236 }
237
238 debug!(
239 count = items.len(),
240 visited = visited.len(),
241 "Nested decompose completed"
242 );
243
244 Ok(items)
245 }
246
247 pub(super) fn resolve_decompose_source(
249 &self,
250 source: &str,
251 bindings: &ResolvedBindings,
252 datastore: &RunContext,
253 ) -> Result<serde_json::Value, NikaError> {
254 if source.starts_with("{{with.") && source.ends_with("}}") {
255 let alias = &source[7..source.len() - 2];
257 bindings.get_resolved(alias, datastore)
258 } else if let Some(alias) = source.strip_prefix('$') {
259 if alias.contains('.') {
260 datastore
262 .resolve_path(alias)
263 .ok_or_else(|| NikaError::BindingNotFound {
264 alias: alias.to_string(),
265 })
266 } else {
267 bindings.get_resolved(alias, datastore)
269 }
270 } else {
271 Ok(serde_json::Value::String(source.to_string()))
273 }
274 }
275
276 pub(super) fn extract_decompose_key(
278 &self,
279 value: &serde_json::Value,
280 ) -> Result<String, NikaError> {
281 match value {
282 serde_json::Value::String(s) => Ok(s.clone()),
283 serde_json::Value::Object(obj) => obj
284 .get("key")
285 .and_then(|v| v.as_str())
286 .map(|s| s.to_string())
287 .ok_or_else(|| NikaError::BindingTypeMismatch {
288 expected: "string or object with 'key'".to_string(),
289 actual: "object without 'key'".to_string(),
290 path: "decompose.source".to_string(),
291 }),
292 _ => Err(NikaError::BindingTypeMismatch {
293 expected: "string or object".to_string(),
294 actual: self.json_type_name(value),
295 path: "decompose.source".to_string(),
296 }),
297 }
298 }
299
300 pub(super) fn extract_decompose_nodes(
304 &self,
305 result: serde_json::Value,
306 ) -> Result<Vec<serde_json::Value>, NikaError> {
307 if let serde_json::Value::Object(mut map) = result {
309 if let Some(serde_json::Value::Array(nodes)) = map.remove("nodes") {
310 return Ok(nodes);
311 }
312 if let Some(serde_json::Value::Array(items)) = map.remove("items") {
313 return Ok(items);
314 }
315 if let Some(serde_json::Value::Array(results)) = map.remove("results") {
316 return Ok(results);
317 }
318 } else if let serde_json::Value::Array(arr) = result {
320 return Ok(arr);
321 }
322 Err(NikaError::McpInvalidResponse {
323 tool: "novanet_search".to_string(),
324 reason: "expected nodes/items/results array in response".to_string(),
325 })
326 }
327
328 pub(super) fn json_type_name(&self, value: &serde_json::Value) -> String {
330 match value {
331 serde_json::Value::Null => "null",
332 serde_json::Value::Bool(_) => "boolean",
333 serde_json::Value::Number(_) => "number",
334 serde_json::Value::String(_) => "string",
335 serde_json::Value::Array(_) => "array",
336 serde_json::Value::Object(_) => "object",
337 }
338 .to_string()
339 }
340}