use async_trait::async_trait;
use clap::{CommandFactory, Parser};
use crate::interpreter::{ExecResult, OutputData};
use crate::scheduler::{extract_items, parse_scatter_options};
use crate::tools::{schema_from_clap, ExecContext, ToolCtx, GlobalFlags, Tool, ToolArgs, ToolSchema};
pub struct Scatter;
#[derive(Parser, Debug)]
#[command(name = "scatter", about = "Fan out input items for parallel processing")]
struct ScatterArgs {
#[arg(id = "as", long = "as")]
_as: Option<String>,
#[arg(id = "limit", long = "limit")]
_limit: Option<String>,
#[arg(id = "timeout", long = "timeout")]
_timeout: Option<String>,
#[command(flatten)]
global: GlobalFlags,
#[arg(hide = true)]
rest: Vec<String>,
}
#[async_trait]
impl Tool for Scatter {
fn name(&self) -> &str {
"scatter"
}
fn schema(&self) -> ToolSchema {
schema_from_clap(
&ScatterArgs::command(),
"scatter",
"Fan out input items for parallel processing",
[
("Parallel processing", "seq 1 10 | scatter | echo ${ITEM} | gather"),
("Custom variable name", "split \"a,b,c\" \",\" | scatter --as X | echo ${X} | gather"),
("Per-worker timeout", "seq 1 5 | scatter --timeout 2s | sleep 60 | gather"),
],
)
}
async fn execute(&self, args: ToolArgs, ctx: &mut dyn ToolCtx) -> ExecResult {
let Some(ctx) = ctx.as_any_mut().downcast_mut::<ExecContext>() else {
return ExecResult::failure(1, "internal error: kernel builtin requires ExecContext");
};
let parsed = match ScatterArgs::try_parse_from(
std::iter::once("scatter".to_string()).chain(args.to_argv()),
) {
Ok(p) => p,
Err(e) => return ExecResult::failure(2, format!("scatter: {e}")),
};
parsed.global.apply(ctx);
let opts = parse_scatter_options(&args);
let data = ctx.take_stdin_data();
let text = ctx.read_stdin_to_string().await.unwrap_or_default();
let items = match extract_items(data.as_ref(), &text) {
Ok(items) => items,
Err(msg) => return ExecResult::failure(1, msg),
};
if items.is_empty() {
return ExecResult::success("");
}
let output = format!(
"scatter: {} items (as=${}, limit={})\n{}",
items.len(),
opts.var_name,
opts.limit,
items.join("\n")
);
ExecResult::with_output(OutputData::text(output))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ast::Value;
#[tokio::test]
async fn test_scatter_with_structured_data() {
use crate::vfs::{MemoryFs, VfsRouter};
use std::sync::Arc;
let mut vfs = VfsRouter::new();
vfs.mount("/", MemoryFs::new());
let mut ctx = ExecContext::new(Arc::new(vfs));
let data = Value::Json(serde_json::json!(["item1", "item2", "item3"]));
ctx.set_stdin_with_data("item1\nitem2\nitem3".to_string(), Some(data));
let result = Scatter.execute(ToolArgs::new(), &mut ctx).await;
assert!(result.ok());
assert!(result.text_out().contains("3 items"));
assert!(result.text_out().contains("item1"));
}
#[tokio::test]
async fn test_scatter_empty_input() {
use crate::vfs::{MemoryFs, VfsRouter};
use std::sync::Arc;
let mut vfs = VfsRouter::new();
vfs.mount("/", MemoryFs::new());
let mut ctx = ExecContext::new(Arc::new(vfs));
let result = Scatter.execute(ToolArgs::new(), &mut ctx).await;
assert!(result.ok());
assert!(result.text_out().is_empty());
}
#[tokio::test]
async fn test_scatter_multiline_fans_out_per_line() {
use crate::vfs::{MemoryFs, VfsRouter};
use std::sync::Arc;
let mut vfs = VfsRouter::new();
vfs.mount("/", MemoryFs::new());
let mut ctx = ExecContext::new(Arc::new(vfs));
ctx.set_stdin("a\nb\nc".to_string());
let result = Scatter.execute(ToolArgs::new(), &mut ctx).await;
assert!(result.ok());
assert!(result.text_out().contains("3 items"), "should fan out to 3 items: {}", result.text_out());
}
#[tokio::test]
async fn test_scatter_single_line_text() {
use crate::vfs::{MemoryFs, VfsRouter};
use std::sync::Arc;
let mut vfs = VfsRouter::new();
vfs.mount("/", MemoryFs::new());
let mut ctx = ExecContext::new(Arc::new(vfs));
ctx.set_stdin("hello".to_string());
let result = Scatter.execute(ToolArgs::new(), &mut ctx).await;
assert!(result.ok());
assert!(result.text_out().contains("1 items"));
assert!(result.text_out().contains("hello"));
}
#[tokio::test]
async fn test_scatter_with_options() {
use crate::vfs::{MemoryFs, VfsRouter};
use std::sync::Arc;
let mut vfs = VfsRouter::new();
vfs.mount("/", MemoryFs::new());
let mut ctx = ExecContext::new(Arc::new(vfs));
let data = Value::Json(serde_json::json!(["a", "b"]));
ctx.set_stdin_with_data("a\nb".to_string(), Some(data));
let mut args = ToolArgs::new();
args.named.insert("as".to_string(), Value::String("URL".to_string()));
args.named.insert("limit".to_string(), Value::Int(4));
let result = Scatter.execute(args, &mut ctx).await;
assert!(result.ok());
assert!(result.text_out().contains("as=$URL"));
assert!(result.text_out().contains("limit=4"));
}
}