use std::collections::HashSet;
use clap::Parser;
use super::commands::{dispatch, BatchInput};
use super::BatchContext;
const PIPELINE_FAN_OUT_LIMIT: usize = 50;
fn is_pipeable_command(tokens: &[String]) -> bool {
let probe_tokens = if tokens.len() == 1 {
vec![tokens[0].clone(), "__probe__".to_string()]
} else {
tokens.to_vec()
};
BatchInput::try_parse_from(&probe_tokens)
.map(|input| input.cmd.is_pipeable())
.unwrap_or(false)
}
const PIPEABLE_NAMES: &[&str] = &[
"blame", "callers", "callees", "deps", "explain", "similar", "impact", "test-map", "related",
"scout",
];
fn pipeable_command_names() -> String {
PIPEABLE_NAMES.join(", ")
}
fn extract_names(val: &serde_json::Value) -> Vec<String> {
let mut seen = HashSet::new();
let mut names = Vec::new();
let mut push = |n: &str| {
if !n.is_empty() && seen.insert(n.to_string()) {
names.push(n.to_string());
}
};
if let Some(name) = val.get("name").and_then(|v| v.as_str()) {
push(name);
}
if val.is_array() {
for n in extract_from_bare_array(val) {
push(&n);
}
return names;
}
for n in extract_from_standard_fields(val) {
push(&n);
}
names
}
fn extract_from_bare_array(val: &serde_json::Value) -> Vec<String> {
val.as_array()
.into_iter()
.flatten()
.filter_map(|item| item.get("name")?.as_str().map(String::from))
.collect()
}
fn extract_from_standard_fields(val: &serde_json::Value) -> Vec<String> {
let obj = match val.as_object() {
Some(o) => o,
None => return Vec::new(),
};
let mut names = Vec::new();
for (key, field_val) in obj {
if key == "name" {
continue;
}
if let Some(arr) = field_val.as_array() {
for item in arr {
if let Some(name) = item.get("name").and_then(|v| v.as_str()) {
names.push(name.to_string());
}
if let Some(inner_obj) = item.as_object() {
for (_, inner_val) in inner_obj {
if let Some(inner_arr) = inner_val.as_array() {
for inner_item in inner_arr {
if let Some(name) = inner_item.get("name").and_then(|v| v.as_str())
{
names.push(name.to_string());
}
}
}
}
}
}
}
}
names
}
fn split_tokens_by_pipe(tokens: &[String]) -> Vec<Vec<String>> {
let mut segments: Vec<Vec<String>> = Vec::new();
let mut current: Vec<String> = Vec::new();
for token in tokens {
if token == "|" {
segments.push(std::mem::take(&mut current));
} else {
current.push(token.clone());
}
}
segments.push(current);
segments
}
pub(crate) fn execute_pipeline(
ctx: &BatchContext,
tokens: &[String],
raw_line: &str,
) -> serde_json::Value {
let _span = tracing::info_span!("pipeline", input = raw_line).entered();
let segments = split_tokens_by_pipe(tokens);
let stage_count = segments.len();
for (i, seg) in segments.iter().enumerate() {
if seg.is_empty() {
return serde_json::json!({"error": format!(
"Empty pipeline segment at position {}", i + 1
)});
}
}
for seg in &segments[1..] {
if !is_pipeable_command(seg) {
let cmd = seg.first().map(|s| s.as_str()).unwrap_or("(empty)");
return serde_json::json!({"error": format!(
"Cannot pipe into '{}' \u{2014} it doesn't accept a function name. \
Pipeable commands: {}",
cmd,
pipeable_command_names()
)});
}
}
for seg in &segments {
if let Some(first) = seg.first() {
let lower = first.to_ascii_lowercase();
if lower == "quit" || lower == "exit" || lower == "help" {
return serde_json::json!({"error": format!(
"'{}' cannot be used in a pipeline", first
)});
}
}
}
let stage0_result = {
let _stage_span = tracing::info_span!(
"pipeline_stage",
stage = 0,
command = segments[0].first().map(|s| s.as_str()).unwrap_or("?"),
)
.entered();
match BatchInput::try_parse_from(&segments[0]) {
Ok(input) => match dispatch(ctx, input.cmd) {
Ok(val) => val,
Err(e) => {
return serde_json::json!({"error": format!(
"Pipeline stage 1 failed: {}", e
)});
}
},
Err(e) => {
return serde_json::json!({"error": format!(
"Pipeline stage 1 parse error: {}", e
)});
}
}
};
let mut current_value = stage0_result;
let mut any_truncated = false;
for (stage_idx, segment) in segments[1..].iter().enumerate() {
let stage_num = stage_idx + 1;
let mut names = extract_names(¤t_value);
tracing::debug!(stage = stage_num, count = names.len(), "Names extracted");
if names.len() > PIPELINE_FAN_OUT_LIMIT {
any_truncated = true;
tracing::info!(
stage = stage_num,
original = names.len(),
limit = PIPELINE_FAN_OUT_LIMIT,
"Fan-out truncated"
);
names.truncate(PIPELINE_FAN_OUT_LIMIT);
}
let total_inputs = names.len();
let _stage_span = tracing::info_span!(
"pipeline_stage",
stage = stage_num + 1, command = segment.first().map(|s| s.as_str()).unwrap_or("?"),
fan_out = total_inputs,
)
.entered();
if names.is_empty() {
return build_pipeline_result(raw_line, stage_count, vec![], vec![], 0, false);
}
let mut results: Vec<(String, serde_json::Value)> = Vec::new();
let mut errors: Vec<(String, String)> = Vec::new();
for name in &names {
let mut cmd_tokens = vec![segment[0].clone(), "--".to_string(), name.clone()];
cmd_tokens.extend_from_slice(&segment[1..]);
match BatchInput::try_parse_from(&cmd_tokens) {
Ok(input) => match dispatch(ctx, input.cmd) {
Ok(val) => results.push((name.clone(), val)),
Err(e) => {
tracing::warn!(name = name, error = %e, "Per-name dispatch failed");
errors.push((name.clone(), e.to_string()));
}
},
Err(e) => {
tracing::warn!(name = name, error = %e, "Per-name parse failed");
errors.push((name.clone(), e.to_string()));
}
}
}
if stage_num == segments.len() - 1 {
return build_pipeline_result(
raw_line,
stage_count,
results,
errors,
total_inputs,
any_truncated,
);
}
let mut merged_names: Vec<String> = Vec::new();
let mut merged_seen = HashSet::new();
'merge: for (_, val) in &results {
for n in extract_names(val) {
if merged_seen.insert(n.clone()) {
merged_names.push(n);
if merged_names.len() >= PIPELINE_FAN_OUT_LIMIT {
break 'merge;
}
}
}
}
let synthetic: Vec<serde_json::Value> = merged_names
.iter()
.map(|n| serde_json::json!({"name": n}))
.collect();
current_value = serde_json::json!({"results": synthetic});
}
serde_json::json!({"error": "Pipeline execution ended unexpectedly"})
}
fn build_pipeline_result(
pipeline_str: &str,
stages: usize,
results: Vec<(String, serde_json::Value)>,
errors: Vec<(String, String)>,
total_inputs: usize,
truncated: bool,
) -> serde_json::Value {
let results_json: Vec<serde_json::Value> = results
.into_iter()
.map(|(input, data)| serde_json::json!({"_input": input, "data": data}))
.collect();
let errors_json: Vec<serde_json::Value> = errors
.into_iter()
.map(|(input, err)| serde_json::json!({"_input": input, "error": err}))
.collect();
serde_json::json!({
"pipeline": pipeline_str,
"stages": stages,
"results": results_json,
"errors": errors_json,
"total_inputs": total_inputs,
"truncated": truncated,
})
}
pub(crate) fn has_pipe_token(tokens: &[String]) -> bool {
tokens.iter().any(|t| t == "|")
}
#[cfg(test)]
mod tests {
use super::*;
use clap::Parser;
#[test]
fn test_extract_names_search_result() {
let val = serde_json::json!({
"results": [{"name": "a", "file": "f.rs"}, {"name": "b", "file": "g.rs"}],
"query": "test",
"total": 2
});
assert_eq!(extract_names(&val), vec!["a", "b"]);
}
#[test]
fn test_extract_names_callers_bare_array() {
let val = serde_json::json!([{"name": "a", "file": "f.rs"}, {"name": "b", "file": "g.rs"}]);
assert_eq!(extract_names(&val), vec!["a", "b"]);
}
#[test]
fn test_extract_names_callees() {
let val = serde_json::json!({
"name": "f",
"calls": [{"name": "a", "line_start": 1}],
"count": 1
});
let names = extract_names(&val);
assert_eq!(names, vec!["f", "a"]);
}
#[test]
fn test_extract_names_impact() {
let val = serde_json::json!({
"name": "f",
"callers": [{"name": "a"}],
"tests": [{"name": "b"}],
"caller_count": 1,
"test_count": 1
});
let names = extract_names(&val);
assert!(names.contains(&"f".to_string()));
assert!(names.contains(&"a".to_string()));
assert!(names.contains(&"b".to_string()));
}
#[test]
fn test_extract_names_dead() {
let val = serde_json::json!({
"dead": [{"name": "a"}],
"possibly_dead_pub": [{"name": "b"}],
"count": 1,
"possibly_pub_count": 1
});
let names = extract_names(&val);
assert!(names.contains(&"a".to_string()));
assert!(names.contains(&"b".to_string()));
}
#[test]
fn test_extract_names_related() {
let val = serde_json::json!({
"target": "f",
"shared_callers": [{"name": "a"}],
"shared_callees": [{"name": "b"}],
"shared_types": [{"name": "c"}]
});
let names = extract_names(&val);
assert!(names.contains(&"a".to_string()));
assert!(names.contains(&"b".to_string()));
assert!(names.contains(&"c".to_string()));
}
#[test]
fn test_extract_names_trace() {
let val = serde_json::json!({
"source": "s",
"target": "t",
"path": [{"name": "a"}, {"name": "b"}],
"depth": 1
});
let names = extract_names(&val);
assert!(names.contains(&"a".to_string()));
assert!(names.contains(&"b".to_string()));
}
#[test]
fn test_extract_names_explain() {
let val = serde_json::json!({
"name": "target",
"callers": [{"name": "a"}],
"similar": [{"name": "b"}]
});
let names = extract_names(&val);
assert_eq!(names[0], "target"); assert!(names.contains(&"a".to_string()));
assert!(names.contains(&"b".to_string()));
}
#[test]
fn test_extract_names_empty_results() {
let val = serde_json::json!({"results": [], "query": "x", "total": 0});
assert!(extract_names(&val).is_empty());
}
#[test]
fn test_extract_names_stats_no_names() {
let val = serde_json::json!({
"total_chunks": 100,
"total_files": 10,
"notes": 5
});
assert!(extract_names(&val).is_empty());
}
#[test]
fn test_extract_names_dedup() {
let val = serde_json::json!({
"results": [{"name": "a"}, {"name": "a"}, {"name": "b"}]
});
assert_eq!(extract_names(&val), vec!["a", "b"]);
}
#[test]
fn test_is_pipeable_callers() {
assert!(is_pipeable_command(&["callers".to_string()]));
}
#[test]
fn test_is_pipeable_search() {
assert!(!is_pipeable_command(&[
"search".to_string(),
"foo".to_string()
]));
}
#[test]
fn test_is_pipeable_stats() {
assert!(!is_pipeable_command(&["stats".to_string()]));
}
#[test]
fn test_split_tokens_by_pipe() {
let tokens: Vec<String> = vec!["search", "foo", "|", "callers"]
.into_iter()
.map(String::from)
.collect();
let segments = split_tokens_by_pipe(&tokens);
assert_eq!(segments.len(), 2);
assert_eq!(segments[0], vec!["search", "foo"]);
assert_eq!(segments[1], vec!["callers"]);
}
#[test]
fn test_split_tokens_three_stages() {
let tokens: Vec<String> = vec!["search", "foo", "|", "callers", "|", "test-map"]
.into_iter()
.map(String::from)
.collect();
let segments = split_tokens_by_pipe(&tokens);
assert_eq!(segments.len(), 3);
assert_eq!(segments[0], vec!["search", "foo"]);
assert_eq!(segments[1], vec!["callers"]);
assert_eq!(segments[2], vec!["test-map"]);
}
#[test]
fn test_has_pipe_token() {
let with_pipe: Vec<String> = vec!["search", "foo", "|", "callers"]
.into_iter()
.map(String::from)
.collect();
assert!(has_pipe_token(&with_pipe));
let without_pipe: Vec<String> = vec!["search", "foo|bar"]
.into_iter()
.map(String::from)
.collect();
assert!(!has_pipe_token(&without_pipe));
}
#[test]
fn test_extract_names_scout() {
let val = serde_json::json!({
"file_groups": [
{
"file": "src/search.rs",
"chunks": [
{"name": "search_filtered", "role": "modify_target"},
{"name": "resolve_target", "role": "dependency"}
]
},
{
"file": "src/store.rs",
"chunks": [
{"name": "open_store", "role": "modify_target"}
]
}
],
"summary": {"total_files": 2}
});
let names = extract_names(&val);
assert_eq!(names.len(), 3);
assert!(names.contains(&"search_filtered".to_string()));
assert!(names.contains(&"resolve_target".to_string()));
assert!(names.contains(&"open_store".to_string()));
}
#[test]
fn test_is_pipeable_scout() {
assert!(is_pipeable_command(&[
"scout".to_string(),
"foo".to_string()
]));
}
#[test]
fn test_pipeable_commands_parse_with_name_arg() {
let pipeable = [
"blame", "callers", "callees", "deps", "explain", "similar", "impact", "test-map",
"related", "scout",
];
for cmd in pipeable {
let result = BatchInput::try_parse_from([cmd, "test_function"]);
assert!(
result.is_ok(),
"Pipeable command '{cmd}' should accept a positional name arg"
);
let input = result.unwrap();
assert!(
input.cmd.is_pipeable(),
"'{cmd}' should be pipeable via is_pipeable()"
);
}
}
#[test]
fn test_non_pipeable_not_pipeable() {
let non_pipeable_cmds = [
("search", vec!["search", "foo"]),
("gather", vec!["gather", "foo"]),
("dead", vec!["dead"]),
("stats", vec!["stats"]),
("stale", vec!["stale"]),
("health", vec!["health"]),
("context", vec!["context", "path"]),
];
for (label, tokens) in non_pipeable_cmds {
let tokens: Vec<String> = tokens.into_iter().map(String::from).collect();
let result = BatchInput::try_parse_from(&tokens);
if let Ok(input) = result {
assert!(!input.cmd.is_pipeable(), "'{label}' should NOT be pipeable");
}
}
}
#[test]
fn test_pipeable_names_sync() {
for name in PIPEABLE_NAMES {
let result = BatchInput::try_parse_from([*name, "test_arg"]);
assert!(
result.is_ok(),
"PIPEABLE_NAMES entry '{name}' failed to parse"
);
assert!(
result.unwrap().cmd.is_pipeable(),
"PIPEABLE_NAMES entry '{name}' not pipeable via is_pipeable()"
);
}
use clap::CommandFactory;
let app = BatchInput::command();
let pipeable_set: std::collections::HashSet<&str> =
PIPEABLE_NAMES.iter().copied().collect();
for sc in app.get_subcommands() {
let name = sc.get_name();
let Ok(input) = BatchInput::try_parse_from([name, "test_arg"]) else {
continue; };
if input.cmd.is_pipeable() {
assert!(
pipeable_set.contains(name),
"Command '{name}' is pipeable via is_pipeable() but missing from PIPEABLE_NAMES"
);
}
}
}
#[test]
fn test_is_pipeable_command_rejects_non_pipeable() {
assert!(!is_pipeable_command(&[
"search".to_string(),
"foo".to_string()
]));
assert!(!is_pipeable_command(&["dead".to_string()]));
assert!(!is_pipeable_command(&["stats".to_string()]));
}
#[test]
fn test_is_pipeable_command_accepts_pipeable() {
assert!(is_pipeable_command(&[
"callers".to_string(),
"foo".to_string()
]));
assert!(is_pipeable_command(&[
"callees".to_string(),
"bar".to_string()
]));
assert!(is_pipeable_command(&[
"impact".to_string(),
"baz".to_string()
]));
}
#[test]
fn test_is_pipeable_command_empty() {
assert!(!is_pipeable_command(&[]));
}
#[test]
fn test_pipeable_command_names_string() {
let names = pipeable_command_names();
assert!(names.contains("callers"));
assert!(names.contains("callees"));
assert!(names.contains("blame"));
assert!(!names.contains("search"));
assert!(!names.contains("dead"));
assert!(!names.contains("stats"));
}
}