use std::collections::HashMap;
use std::io::{IsTerminal, Read};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, OnceLock};
use serde_json::Value;
use thiserror::Error;
use crate::security::AuditLogger;
#[derive(Debug, Error)]
pub enum CliError {
#[error("invalid module id: {0}")]
InvalidModuleId(String),
#[error("reserved module id: '{0}' conflicts with a built-in command name")]
ReservedModuleId(String),
#[error("stdin read error: {0}")]
StdinRead(String),
#[error("json parse error: {0}")]
JsonParse(String),
#[error("input too large (limit {limit} bytes, got {actual} bytes)")]
InputTooLarge { limit: usize, actual: usize },
#[error("expected JSON object, got a different type")]
NotAnObject,
#[error("schema $ref resolution failed for module '{module_id}': {source}")]
SchemaRefResolution {
module_id: String,
source: crate::ref_resolver::RefResolverError,
},
}
impl CliError {
pub fn exit_code(&self) -> i32 {
match self {
CliError::SchemaRefResolution { .. } => crate::EXIT_SCHEMA_CIRCULAR_REF,
_ => crate::EXIT_INVALID_INPUT,
}
}
}
static VERBOSE_HELP: AtomicBool = AtomicBool::new(false);
pub fn set_verbose_help(verbose: bool) {
VERBOSE_HELP.store(verbose, Ordering::Relaxed);
}
pub fn is_verbose_help() -> bool {
VERBOSE_HELP.load(Ordering::Relaxed)
}
static DOCS_URL: Mutex<Option<String>> = Mutex::new(None);
pub fn set_docs_url(url: Option<String>) {
if let Ok(mut guard) = DOCS_URL.lock() {
*guard = url;
}
}
pub fn get_docs_url() -> Option<String> {
match DOCS_URL.lock() {
Ok(guard) => guard.clone(),
Err(_) => None,
}
}
static AUDIT_LOGGER: Mutex<Option<AuditLogger>> = Mutex::new(None);
static EXECUTABLES: OnceLock<HashMap<String, PathBuf>> = OnceLock::new();
pub fn set_executables(map: HashMap<String, PathBuf>) {
let _ = EXECUTABLES.set(map);
}
pub fn set_audit_logger(audit_logger: Option<AuditLogger>) {
match AUDIT_LOGGER.lock() {
Ok(mut guard) => {
*guard = audit_logger;
}
Err(_poisoned) => {
tracing::warn!("AUDIT_LOGGER mutex poisoned — audit logger not updated");
}
}
}
fn audit_log_entry(module_id: &str, input: &Value, status: &str, exit_code: i32, duration_ms: u64) {
if let Ok(guard) = AUDIT_LOGGER.lock() {
if let Some(logger) = guard.as_ref() {
logger.log_execution(module_id, input, status, exit_code, duration_ms);
}
}
}
fn audit_success(module_id: &str, input: &Value, duration_ms: u64) {
audit_log_entry(module_id, input, "success", 0, duration_ms);
}
fn audit_error(module_id: &str, input: &Value, exit_code: i32, duration_ms: u64) {
audit_log_entry(module_id, input, "error", exit_code, duration_ms);
}
pub fn add_dispatch_flags(cmd: clap::Command) -> clap::Command {
use clap::{Arg, ArgAction};
let hide = !is_verbose_help();
cmd.arg(
Arg::new("input")
.long("input")
.value_name("SOURCE")
.help(
"Read JSON input from a file path, \
or use '-' to read from stdin pipe",
)
.hide(hide),
)
.arg(
Arg::new("yes")
.long("yes")
.short('y')
.action(ArgAction::SetTrue)
.help(
"Skip interactive approval prompts \
(for scripts and CI)",
)
.hide(hide),
)
.arg(
Arg::new("large-input")
.long("large-input")
.action(ArgAction::SetTrue)
.help(
"Allow stdin input larger than 10MB \
(default limit protects against \
accidental pipes)",
)
.hide(hide),
)
.arg(
Arg::new("format")
.long("format")
.value_parser(["table", "json", "csv", "yaml", "jsonl"])
.help(
"Output format: json, table, csv, \
yaml, jsonl.",
)
.hide(hide),
)
.arg(
Arg::new("fields")
.long("fields")
.value_name("FIELDS")
.help(
"Comma-separated dot-paths to select \
from the result (e.g., 'status,data.count').",
)
.hide(hide),
)
.arg(
Arg::new("sandbox")
.long("sandbox")
.action(ArgAction::SetTrue)
.help(
"Run module in an isolated subprocess \
with restricted filesystem and env \
access",
)
.hide(true),
)
.arg(
Arg::new("dry-run")
.long("dry-run")
.action(ArgAction::SetTrue)
.help(
"Run preflight checks without executing \
the module. Shows validation results.",
)
.hide(hide),
)
.arg(
Arg::new("trace")
.long("trace")
.action(ArgAction::SetTrue)
.help(
"Show execution pipeline trace with \
per-step timing after the result.",
)
.hide(hide),
)
.arg(
Arg::new("stream")
.long("stream")
.action(ArgAction::SetTrue)
.help(
"Stream module output as JSONL (one JSON \
object per line, flushed immediately).",
)
.hide(hide),
)
.arg(
Arg::new("strategy")
.long("strategy")
.value_parser(["standard", "internal", "testing", "performance", "minimal"])
.value_name("STRATEGY")
.help(
"Execution pipeline strategy: standard \
(default), internal, testing, performance.",
)
.hide(hide),
)
.arg(
Arg::new("approval-timeout")
.long("approval-timeout")
.value_name("SECONDS")
.help(
"Override approval prompt timeout in \
seconds (default: 60).",
)
.hide(hide),
)
.arg(
Arg::new("approval-token")
.long("approval-token")
.value_name("TOKEN")
.help(
"Resume a pending approval with the \
given token (for async approval flows).",
)
.hide(hide),
)
}
pub fn exec_command() -> clap::Command {
use clap::{Arg, Command};
let cmd = Command::new("exec").about("Execute an apcore module").arg(
Arg::new("module_id")
.required(true)
.value_name("MODULE_ID")
.help("Fully-qualified module ID to execute"),
);
add_dispatch_flags(cmd)
}
const RESERVED_FLAG_NAMES: &[&str] = &[
"approval-timeout",
"approval-token",
"dry-run",
"fields",
"format",
"input",
"large-input",
"sandbox",
"strategy",
"stream",
"trace",
"verbose",
"yes",
];
pub fn build_module_command(
module_def: &apcore::registry::registry::ModuleDescriptor,
) -> Result<clap::Command, CliError> {
build_module_command_with_limit(module_def, crate::schema_parser::HELP_TEXT_MAX_LEN)
}
pub fn build_module_command_with_limit(
module_def: &apcore::registry::registry::ModuleDescriptor,
help_text_max_length: usize,
) -> Result<clap::Command, CliError> {
let module_id = &module_def.module_id;
if crate::builtin_group::RESERVED_GROUP_NAMES.contains(&module_id.as_str()) {
return Err(CliError::ReservedModuleId(module_id.clone()));
}
let resolved_schema = crate::ref_resolver::resolve_refs(
&module_def.input_schema,
crate::ref_resolver::MAX_REF_DEPTH,
module_id,
)
.map_err(|e| CliError::SchemaRefResolution {
module_id: module_id.clone(),
source: e,
})?;
let schema_args = crate::schema_parser::schema_to_clap_args_with_limit(
&resolved_schema,
help_text_max_length,
)
.map_err(|e| CliError::InvalidModuleId(format!("schema parse error: {e}")))?;
for arg in &schema_args.args {
if let Some(long) = arg.get_long() {
if RESERVED_FLAG_NAMES.contains(&long) {
return Err(CliError::ReservedModuleId(format!(
"module '{module_id}' schema property '{long}' conflicts \
with a reserved CLI option name"
)));
}
}
}
let hide = !is_verbose_help();
let mut footer_parts = Vec::new();
if hide {
footer_parts.push(
"Use --verbose to show all options \
(including built-in apcore options)."
.to_string(),
);
}
if let Some(url) = get_docs_url() {
footer_parts.push(format!("Docs: {url}/commands/{module_id}"));
}
let footer = footer_parts.join("\n");
let mut cmd = add_dispatch_flags(clap::Command::new(module_id.clone()).after_help(footer));
for arg in schema_args.args {
cmd = cmd.arg(arg);
}
Ok(cmd)
}
const STDIN_SIZE_LIMIT_BYTES: usize = 10 * 1024 * 1024;
pub fn collect_input_from_reader<R: Read>(
stdin_flag: Option<&str>,
cli_kwargs: HashMap<String, Value>,
large_input: bool,
mut reader: R,
) -> Result<HashMap<String, Value>, CliError> {
let cli_non_null: HashMap<String, Value> = cli_kwargs
.into_iter()
.filter(|(_, v)| !v.is_null())
.collect();
if stdin_flag != Some("-") {
return Ok(cli_non_null);
}
let mut buf = Vec::new();
reader
.read_to_end(&mut buf)
.map_err(|e| CliError::StdinRead(e.to_string()))?;
if !large_input && buf.len() > STDIN_SIZE_LIMIT_BYTES {
return Err(CliError::InputTooLarge {
limit: STDIN_SIZE_LIMIT_BYTES,
actual: buf.len(),
});
}
if buf.is_empty() {
return Ok(cli_non_null);
}
let stdin_value: Value =
serde_json::from_slice(&buf).map_err(|e| CliError::JsonParse(e.to_string()))?;
let stdin_map = match stdin_value {
Value::Object(m) => m,
_ => return Err(CliError::NotAnObject),
};
let mut merged: HashMap<String, Value> = stdin_map.into_iter().collect();
merged.extend(cli_non_null);
Ok(merged)
}
pub fn collect_input(
stdin_flag: Option<&str>,
cli_kwargs: HashMap<String, Value>,
large_input: bool,
) -> Result<HashMap<String, Value>, CliError> {
match stdin_flag {
None | Some("") => {
collect_input_from_reader(None, cli_kwargs, large_input, std::io::stdin())
}
Some("-") => {
collect_input_from_reader(Some("-"), cli_kwargs, large_input, std::io::stdin())
}
Some(path) => {
let file = std::fs::File::open(path).map_err(|e| {
CliError::StdinRead(format!("cannot open input file '{}': {}", path, e))
})?;
collect_input_from_reader(Some("-"), cli_kwargs, large_input, file)
}
}
}
const MODULE_ID_MAX_LEN: usize = 192;
pub fn validate_module_id(module_id: &str) -> Result<(), CliError> {
if module_id.len() > MODULE_ID_MAX_LEN {
return Err(CliError::InvalidModuleId(format!(
"Invalid module ID format: '{module_id}'. Maximum length is {MODULE_ID_MAX_LEN} characters."
)));
}
if !is_valid_module_id(module_id) {
return Err(CliError::InvalidModuleId(format!(
"Invalid module ID format: '{module_id}'."
)));
}
Ok(())
}
pub fn validate_module_id_or_exit(module_id: &str) {
if let Err(CliError::InvalidModuleId(msg)) = validate_module_id(module_id) {
eprintln!("Error: {msg}");
std::process::exit(crate::EXIT_INVALID_INPUT);
}
}
#[inline]
fn is_valid_module_id(s: &str) -> bool {
if s.is_empty() {
return false;
}
for segment in s.split('.') {
if segment.is_empty() {
return false;
}
let mut chars = segment.chars();
match chars.next() {
Some(c) if c.is_ascii_lowercase() => {}
_ => return false,
}
for c in chars {
if !c.is_ascii_lowercase() && !c.is_ascii_digit() && c != '_' {
return false;
}
}
}
true
}
pub(crate) fn map_apcore_error_to_exit_code(error_code: &str) -> i32 {
use crate::{
EXIT_ACL_DENIED, EXIT_APPROVAL_DENIED, EXIT_CONFIG_BIND_ERROR, EXIT_CONFIG_MOUNT_ERROR,
EXIT_CONFIG_NAMESPACE_RESERVED, EXIT_CONFIG_NOT_FOUND, EXIT_ERROR_FORMATTER_DUPLICATE,
EXIT_MODULE_EXECUTE_ERROR, EXIT_MODULE_NOT_FOUND, EXIT_SCHEMA_CIRCULAR_REF,
EXIT_SCHEMA_VALIDATION_ERROR,
};
match error_code {
"MODULE_NOT_FOUND" | "MODULE_LOAD_ERROR" | "MODULE_DISABLED" => EXIT_MODULE_NOT_FOUND,
"SCHEMA_VALIDATION_ERROR" => EXIT_SCHEMA_VALIDATION_ERROR,
"APPROVAL_DENIED" | "APPROVAL_TIMEOUT" | "APPROVAL_PENDING" => EXIT_APPROVAL_DENIED,
"CONFIG_NOT_FOUND" | "CONFIG_INVALID" => EXIT_CONFIG_NOT_FOUND,
"SCHEMA_CIRCULAR_REF" => EXIT_SCHEMA_CIRCULAR_REF,
"ACL_DENIED" => EXIT_ACL_DENIED,
"CONFIG_NAMESPACE_RESERVED"
| "CONFIG_NAMESPACE_DUPLICATE"
| "CONFIG_ENV_PREFIX_CONFLICT"
| "CONFIG_ENV_MAP_CONFLICT" => EXIT_CONFIG_NAMESPACE_RESERVED,
"CONFIG_MOUNT_ERROR" => EXIT_CONFIG_MOUNT_ERROR,
"CONFIG_BIND_ERROR" => EXIT_CONFIG_BIND_ERROR,
"ERROR_FORMATTER_DUPLICATE" => EXIT_ERROR_FORMATTER_DUPLICATE,
_ => EXIT_MODULE_EXECUTE_ERROR,
}
}
pub(crate) fn map_module_error_to_exit_code(err: &apcore::errors::ModuleError) -> i32 {
let code_str = serde_json::to_value(err.code)
.ok()
.and_then(|v| v.as_str().map(|s| s.to_string()))
.unwrap_or_default();
map_apcore_error_to_exit_code(&code_str)
}
pub(crate) fn validate_against_schema(
input: &HashMap<String, Value>,
schema: &Value,
) -> Result<(), String> {
let mut instance =
serde_json::to_value(input).map_err(|e| format!("failed to serialize input: {e}"))?;
if let (Some(obj), Some(props)) = (
instance.as_object_mut(),
schema.get("properties").and_then(|p| p.as_object()),
) {
for (key, prop) in props {
let type_str = match prop.get("type").and_then(|t| t.as_str()) {
Some(t) => t,
None => continue,
};
if let Some(Value::String(s)) = obj.get(key) {
let coerced = match type_str {
"integer" => s.parse::<i64>().ok().map(Value::from),
"number" => s.parse::<f64>().ok().map(Value::from),
_ => None,
};
if let Some(v) = coerced {
obj.insert(key.clone(), v);
}
}
}
}
let validator =
jsonschema::validator_for(schema).map_err(|e| format!("invalid schema: {e}"))?;
let errors: Vec<String> = validator
.iter_errors(&instance)
.map(|e| e.to_string())
.collect();
match errors.first() {
Some(msg) => Err(msg.clone()),
None => Ok(()),
}
}
fn emit_error_json(
_module_id: &str,
message: &str,
exit_code: i32,
error_data: Option<&serde_json::Value>,
) {
let mut payload = serde_json::json!({
"error": true,
"code": "UNKNOWN",
"message": message,
"exit_code": exit_code,
});
if let Some(data) = error_data {
if let Some(obj) = data.as_object() {
for key in &[
"code",
"message",
"details",
"suggestion",
"ai_guidance",
"retryable",
"user_fixable",
] {
if let Some(val) = obj.get(*key) {
if !val.is_null() {
payload[*key] = val.clone();
}
}
}
}
}
eprintln!("{}", serde_json::to_string(&payload).unwrap_or_default());
}
fn emit_error_tty(
_module_id: &str,
message: &str,
exit_code: i32,
error_data: Option<&serde_json::Value>,
) {
if let Some(code) = error_data
.and_then(|d| d.get("code"))
.and_then(|v| v.as_str())
{
eprintln!("Error [{code}]: {message}");
} else {
eprintln!("Error: {message}");
}
if let Some(details) = error_data
.and_then(|d| d.get("details"))
.and_then(|v| v.as_object())
{
eprintln!("\n Details:");
for (k, v) in details {
eprintln!(" {k}: {v}");
}
}
if let Some(suggestion) = error_data
.and_then(|d| d.get("suggestion"))
.and_then(|v| v.as_str())
{
eprintln!("\n Suggestion: {suggestion}");
}
if let Some(retryable) = error_data
.and_then(|d| d.get("retryable"))
.and_then(|v| v.as_bool())
{
let label = if retryable {
"Yes"
} else {
"No (same input will fail again)"
};
eprintln!(" Retryable: {label}");
}
eprintln!("\n Exit code: {exit_code}");
}
pub fn reconcile_bool_pairs(
matches: &clap::ArgMatches,
bool_pairs: &[crate::schema_parser::BoolFlagPair],
) -> HashMap<String, Value> {
let mut result = HashMap::new();
for pair in bool_pairs {
let pos_set = matches
.try_get_one::<bool>(&pair.prop_name)
.ok()
.flatten()
.copied()
.unwrap_or(false);
let neg_id = format!("no-{}", pair.prop_name);
let neg_set = matches
.try_get_one::<bool>(&neg_id)
.ok()
.flatten()
.copied()
.unwrap_or(false);
let val = if pos_set {
true
} else if neg_set {
false
} else {
pair.default_val
};
result.insert(pair.prop_name.clone(), Value::Bool(val));
}
result
}
fn extract_cli_kwargs(
matches: &clap::ArgMatches,
module_def: &apcore::registry::registry::ModuleDescriptor,
) -> HashMap<String, Value> {
use crate::schema_parser::schema_to_clap_args;
let schema_args = match schema_to_clap_args(&module_def.input_schema, None) {
Ok(sa) => sa,
Err(_) => return HashMap::new(),
};
let mut kwargs: HashMap<String, Value> = HashMap::new();
for arg in &schema_args.args {
let id = arg.get_id().as_str().to_string();
if id.starts_with("no-") {
continue;
}
if let Ok(Some(val)) = matches.try_get_one::<String>(&id) {
kwargs.insert(id, Value::String(val.clone()));
} else if let Ok(Some(val)) = matches.try_get_one::<std::path::PathBuf>(&id) {
kwargs.insert(id, Value::String(val.to_string_lossy().to_string()));
} else {
kwargs.insert(id, Value::Null);
}
}
let bool_vals = reconcile_bool_pairs(matches, &schema_args.bool_pairs);
kwargs.extend(bool_vals);
crate::schema_parser::reconvert_enum_values(kwargs, &schema_args)
}
async fn execute_script(executable: &std::path::Path, input: &Value) -> Result<Value, String> {
use tokio::io::AsyncWriteExt;
let mut child = tokio::process::Command::new(executable)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.kill_on_drop(true)
.spawn()
.map_err(|e| format!("failed to spawn {}: {}", executable.display(), e))?;
if let Some(mut stdin) = child.stdin.take() {
let payload =
serde_json::to_vec(input).map_err(|e| format!("failed to serialize input: {e}"))?;
stdin
.write_all(&payload)
.await
.map_err(|e| format!("failed to write to stdin: {e}"))?;
drop(stdin);
}
let output = child
.wait_with_output()
.await
.map_err(|e| format!("failed to read output: {e}"))?;
if !output.status.success() {
let code = output.status.code().unwrap_or(1);
let stderr_hint = String::from_utf8_lossy(&output.stderr);
return Err(format!(
"script exited with code {code}{}",
if stderr_hint.is_empty() {
String::new()
} else {
format!(": {}", stderr_hint.trim())
}
));
}
serde_json::from_slice(&output.stdout)
.map_err(|e| format!("script stdout is not valid JSON: {e}"))
}
pub async fn dispatch_module(
module_id: &str,
matches: &clap::ArgMatches,
registry: &Arc<dyn crate::discovery::RegistryProvider>,
apcore_executor: &apcore::Executor,
) -> ! {
use crate::{
EXIT_APPROVAL_DENIED, EXIT_INVALID_INPUT, EXIT_MODULE_NOT_FOUND,
EXIT_SCHEMA_VALIDATION_ERROR, EXIT_SIGINT, EXIT_SUCCESS,
};
validate_module_id_or_exit(module_id);
let module_def = match registry.get_module_descriptor(module_id) {
Some(def) => def,
None => {
eprintln!("Error: Module '{module_id}' not found in registry.");
std::process::exit(EXIT_MODULE_NOT_FOUND);
}
};
let stdin_flag = matches.get_one::<String>("input").map(|s| s.as_str());
let auto_approve = matches.get_flag("yes");
let large_input = matches.get_flag("large-input");
let format_flag = matches.get_one::<String>("format").cloned();
let fields_flag = matches.get_one::<String>("fields").cloned();
let dry_run = matches.get_flag("dry-run");
let trace_flag = matches.get_flag("trace");
let stream_flag = matches.get_flag("stream");
let strategy_name = matches.get_one::<String>("strategy").cloned();
let approval_timeout_arg = matches.get_one::<String>("approval-timeout").cloned();
let approval_token = matches.get_one::<String>("approval-token").cloned();
let cli_kwargs = extract_cli_kwargs(matches, &module_def);
let mut merged = match collect_input(stdin_flag, cli_kwargs, large_input) {
Ok(m) => m,
Err(CliError::InputTooLarge { .. }) => {
eprintln!("Error: STDIN input exceeds 10MB limit. Use --large-input to override.");
std::process::exit(EXIT_INVALID_INPUT);
}
Err(CliError::JsonParse(detail)) => {
eprintln!("Error: STDIN does not contain valid JSON: {detail}.");
std::process::exit(EXIT_INVALID_INPUT);
}
Err(CliError::NotAnObject) => {
eprintln!("Error: STDIN JSON must be an object, got array or scalar.");
std::process::exit(EXIT_INVALID_INPUT);
}
Err(e) => {
eprintln!("Error: {e}");
std::process::exit(EXIT_INVALID_INPUT);
}
};
if dry_run {
let show_trace_preview = trace_flag;
let print_pipeline_preview = || {
if show_trace_preview {
let pure_steps = [
"context_creation",
"call_chain_guard",
"module_lookup",
"acl_check",
"input_validation",
];
let all_steps = [
"context_creation",
"call_chain_guard",
"module_lookup",
"acl_check",
"approval_gate",
"middleware_before",
"input_validation",
"execute",
"output_validation",
"middleware_after",
"return_result",
];
eprintln!("\nPipeline preview (dry-run):");
for s in &all_steps {
if pure_steps.contains(s) {
eprintln!(" v {:<24} (pure -- would execute)", s);
} else {
eprintln!(" o {:<24} (impure -- skipped in dry-run)", s);
}
}
}
};
let input_value =
serde_json::to_value(&merged).unwrap_or(Value::Object(Default::default()));
let preflight =
crate::validate::build_preflight_result(apcore_executor, &module_def, &input_value)
.await;
crate::validate::format_preflight_result(&preflight, format_flag.as_deref());
print_pipeline_preview();
let valid = preflight
.get("valid")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if valid {
std::process::exit(EXIT_SUCCESS);
} else {
let checks = preflight
.get("checks")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
std::process::exit(crate::validate::first_failed_exit_code(&checks));
}
}
if let Some(schema) = module_def.input_schema.as_object() {
if schema.contains_key("properties") {
if let Err(detail) = validate_against_schema(&merged, &module_def.input_schema) {
eprintln!("Error: Validation failed: {detail}.");
std::process::exit(EXIT_SCHEMA_VALIDATION_ERROR);
}
}
}
if let Some(ref token) = approval_token {
merged.insert("_approval_token".to_string(), Value::String(token.clone()));
}
let approval_timeout_secs = approval_timeout_arg
.as_deref()
.and_then(|s| s.parse::<u64>().ok())
.or_else(|| {
let resolver = crate::config::ConfigResolver::new(
None,
Some(std::path::PathBuf::from("apcore.yaml")),
);
resolver
.resolve("cli.approval_timeout", None, None)
.and_then(|s| s.parse::<u64>().ok())
})
.unwrap_or(crate::approval::DEFAULT_APPROVAL_TIMEOUT_SECS);
let module_json = serde_json::to_value(&module_def).unwrap_or_default();
if let Err(e) = crate::approval::check_approval_with_timeout(
&module_json,
auto_approve,
approval_timeout_secs,
)
.await
{
eprintln!("Error: {e}");
std::process::exit(EXIT_APPROVAL_DENIED);
}
let input_value = serde_json::to_value(&merged).unwrap_or(Value::Object(Default::default()));
let use_sandbox = matches.get_flag("sandbox");
let script_executable = EXECUTABLES
.get()
.and_then(|map| map.get(module_id))
.cloned();
if stream_flag {
if format_flag.as_deref() == Some("table") {
eprintln!("Warning: Streaming mode always outputs JSONL; --format table is ignored.");
}
let start = std::time::Instant::now();
if let Some(exec_path) = script_executable.as_ref() {
let res = tokio::select! {
res = execute_script(exec_path, &input_value) => res,
_ = tokio::signal::ctrl_c() => {
eprintln!("Execution cancelled.");
std::process::exit(EXIT_SIGINT);
}
};
let duration_ms = start.elapsed().as_millis() as u64;
match res {
Ok(val) => {
println!("{}", serde_json::to_string(&val).unwrap_or_default());
audit_success(module_id, &input_value, duration_ms);
std::process::exit(EXIT_SUCCESS);
}
Err(e) => {
audit_error(
module_id,
&input_value,
crate::EXIT_MODULE_EXECUTE_ERROR,
duration_ms,
);
eprintln!("Error: {e}");
std::process::exit(crate::EXIT_MODULE_EXECUTE_ERROR);
}
}
}
let res = tokio::select! {
res = apcore_executor.call(
module_id, input_value.clone(), None, None,
) => res,
_ = tokio::signal::ctrl_c() => {
eprintln!("Execution cancelled.");
std::process::exit(EXIT_SIGINT);
}
};
let duration_ms = start.elapsed().as_millis() as u64;
match res {
Ok(val) => {
if let Some(arr) = val.as_array() {
for item in arr {
println!("{}", serde_json::to_string(item).unwrap_or_default());
}
} else {
println!("{}", serde_json::to_string(&val).unwrap_or_default());
}
audit_success(module_id, &input_value, duration_ms);
std::process::exit(EXIT_SUCCESS);
}
Err(e) => {
let code = map_module_error_to_exit_code(&e);
audit_error(module_id, &input_value, code, duration_ms);
eprintln!("Error: Module '{module_id}' execution failed: {e}.");
std::process::exit(code);
}
}
}
if trace_flag {
let start = std::time::Instant::now();
let res = tokio::select! {
res = apcore_executor.call(
module_id,
input_value.clone(),
None,
None,
) => res,
_ = tokio::signal::ctrl_c() => {
eprintln!("Execution cancelled.");
std::process::exit(EXIT_SIGINT);
}
};
let duration_ms = start.elapsed().as_millis() as u64;
match res {
Ok(output) => {
let fmt = crate::output::resolve_format(format_flag.as_deref());
if fmt == "json" {
let trace_data = serde_json::json!({
"strategy": strategy_name.as_deref().unwrap_or("standard"),
"total_duration_ms": duration_ms,
"success": true,
});
let combined = if output.is_object() {
let mut obj = output.as_object().unwrap().clone();
obj.insert("_trace".to_string(), trace_data);
Value::Object(obj)
} else {
serde_json::json!({
"result": output,
"_trace": trace_data,
})
};
println!(
"{}",
serde_json::to_string_pretty(&combined).unwrap_or_default()
);
} else {
let out_str =
crate::output::format_exec_result(&output, fmt, fields_flag.as_deref());
println!("{out_str}");
eprintln!(
"\nPipeline Trace (strategy: {}, {duration_ms}ms)",
strategy_name.as_deref().unwrap_or("standard"),
);
}
audit_success(module_id, &input_value, duration_ms);
std::process::exit(EXIT_SUCCESS);
}
Err(e) => {
let code = map_module_error_to_exit_code(&e);
audit_error(module_id, &input_value, code, duration_ms);
eprintln!("Error: Module '{module_id}' execution failed: {e}.");
std::process::exit(code);
}
}
}
let start = std::time::Instant::now();
let result: Result<Value, (i32, String, Option<Value>)> =
if let Some(exec_path) = script_executable {
tokio::select! {
res = execute_script(&exec_path, &input_value) => {
res.map_err(|e| (crate::EXIT_MODULE_EXECUTE_ERROR, e, None))
}
_ = tokio::signal::ctrl_c() => {
eprintln!("Execution cancelled.");
std::process::exit(EXIT_SIGINT);
}
}
} else if use_sandbox {
let sandbox = crate::security::Sandbox::new(true, 0);
tokio::select! {
res = sandbox.execute(module_id, input_value.clone(), apcore_executor) => {
res.map_err(|e| {
match &e {
crate::security::ModuleExecutionError::ModuleError(inner) => {
let code = map_module_error_to_exit_code(inner);
let data = serde_json::to_value(inner).ok();
(code, e.to_string(), data)
}
_ => (crate::EXIT_MODULE_EXECUTE_ERROR, e.to_string(), None),
}
})
}
_ = tokio::signal::ctrl_c() => {
eprintln!("Execution cancelled.");
std::process::exit(EXIT_SIGINT);
}
}
} else {
tokio::select! {
res = apcore_executor.call(
module_id,
input_value.clone(),
None,
None,
) => {
res.map_err(|e| {
let code = map_module_error_to_exit_code(&e);
let data = serde_json::to_value(&e).ok();
(code, e.to_string(), data)
})
}
_ = tokio::signal::ctrl_c() => {
eprintln!("Execution cancelled.");
std::process::exit(EXIT_SIGINT);
}
}
};
let duration_ms = start.elapsed().as_millis() as u64;
match result {
Ok(output) => {
let fmt = crate::output::resolve_format(format_flag.as_deref());
println!(
"{}",
crate::output::format_exec_result(&output, fmt, fields_flag.as_deref(),)
);
audit_success(module_id, &input_value, duration_ms);
std::process::exit(EXIT_SUCCESS);
}
Err((exit_code, msg, error_data)) => {
audit_error(module_id, &input_value, exit_code, duration_ms);
if format_flag.as_deref() == Some("json") || !std::io::stderr().is_terminal() {
emit_error_json(module_id, &msg, exit_code, error_data.as_ref());
} else {
emit_error_tty(module_id, &msg, exit_code, error_data.as_ref());
}
std::process::exit(exit_code);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_validate_module_id_valid() {
for id in ["math.add", "text.summarize", "a", "a.b.c"] {
let result = validate_module_id(id);
assert!(result.is_ok(), "expected ok for '{id}': {result:?}");
}
}
#[test]
fn test_validate_module_id_too_long() {
let long_id = "a".repeat(193);
assert!(validate_module_id(&long_id).is_err());
}
#[test]
fn test_validate_module_id_invalid_format() {
for id in ["INVALID!ID", "123abc", ".leading.dot", "a..b", "a."] {
assert!(validate_module_id(id).is_err(), "expected error for '{id}'");
}
}
#[test]
fn test_validate_module_id_max_length() {
let max_id = "a".repeat(192);
assert!(validate_module_id(&max_id).is_ok());
}
#[test]
fn test_validate_module_id_over_max_length_message() {
let overlong = "a".repeat(193);
let err = validate_module_id(&overlong).expect_err("expected length error");
assert!(format!("{err:?}").contains("Maximum length"));
}
#[test]
fn test_collect_input_no_stdin_drops_null_values() {
use serde_json::json;
let mut kwargs = HashMap::new();
kwargs.insert("a".to_string(), json!(5));
kwargs.insert("b".to_string(), Value::Null);
let result = collect_input(None, kwargs, false).unwrap();
assert_eq!(result.get("a"), Some(&json!(5)));
assert!(!result.contains_key("b"), "Null values must be dropped");
}
#[test]
fn test_collect_input_stdin_valid_json() {
use serde_json::json;
use std::io::Cursor;
let stdin_bytes = b"{\"x\": 42}";
let reader = Cursor::new(stdin_bytes.to_vec());
let result = collect_input_from_reader(Some("-"), HashMap::new(), false, reader).unwrap();
assert_eq!(result.get("x"), Some(&json!(42)));
}
#[test]
fn test_collect_input_cli_overrides_stdin() {
use serde_json::json;
use std::io::Cursor;
let stdin_bytes = b"{\"a\": 5}";
let reader = Cursor::new(stdin_bytes.to_vec());
let mut kwargs = HashMap::new();
kwargs.insert("a".to_string(), json!(99));
let result = collect_input_from_reader(Some("-"), kwargs, false, reader).unwrap();
assert_eq!(result.get("a"), Some(&json!(99)), "CLI must override STDIN");
}
#[test]
fn test_collect_input_oversized_stdin_rejected() {
use std::io::Cursor;
let big = vec![b' '; 10 * 1024 * 1024 + 1];
let reader = Cursor::new(big);
let err = collect_input_from_reader(Some("-"), HashMap::new(), false, reader).unwrap_err();
assert!(matches!(err, CliError::InputTooLarge { .. }));
}
#[test]
fn test_collect_input_large_input_allowed() {
use std::io::Cursor;
let mut payload = b"{\"k\": \"".to_vec();
payload.extend(vec![b'x'; 11 * 1024 * 1024]);
payload.extend(b"\"}");
let reader = Cursor::new(payload);
let result = collect_input_from_reader(Some("-"), HashMap::new(), true, reader);
assert!(
result.is_ok(),
"large_input=true must accept oversized payload"
);
}
#[test]
fn test_collect_input_invalid_json_returns_error() {
use std::io::Cursor;
let reader = Cursor::new(b"not json at all".to_vec());
let err = collect_input_from_reader(Some("-"), HashMap::new(), false, reader).unwrap_err();
assert!(matches!(err, CliError::JsonParse(_)));
}
#[test]
fn test_collect_input_non_object_json_returns_error() {
use std::io::Cursor;
let reader = Cursor::new(b"[1, 2, 3]".to_vec());
let err = collect_input_from_reader(Some("-"), HashMap::new(), false, reader).unwrap_err();
assert!(matches!(err, CliError::NotAnObject));
}
#[test]
fn test_collect_input_empty_stdin_returns_empty_map() {
use std::io::Cursor;
let reader = Cursor::new(b"".to_vec());
let result = collect_input_from_reader(Some("-"), HashMap::new(), false, reader).unwrap();
assert!(result.is_empty());
}
#[test]
fn test_collect_input_no_stdin_flag_returns_cli_kwargs() {
use serde_json::json;
let mut kwargs = HashMap::new();
kwargs.insert("foo".to_string(), json!("bar"));
let result = collect_input(None, kwargs.clone(), false).unwrap();
assert_eq!(result.get("foo"), Some(&json!("bar")));
}
#[test]
fn test_collect_input_file_path_reads_json() {
use serde_json::json;
use std::io::Write;
let mut tmp = tempfile::NamedTempFile::new().unwrap();
write!(tmp, r#"{{"port": 8080}}"#).unwrap();
let path = tmp.path().to_str().unwrap().to_string();
let result = collect_input(Some(&path), HashMap::new(), false).unwrap();
assert_eq!(result.get("port"), Some(&json!(8080)));
}
#[test]
fn test_collect_input_file_path_cli_overrides_file() {
use serde_json::json;
use std::io::Write;
let mut tmp = tempfile::NamedTempFile::new().unwrap();
write!(tmp, r#"{{"a": 1, "b": 2}}"#).unwrap();
let path = tmp.path().to_str().unwrap().to_string();
let mut kwargs = HashMap::new();
kwargs.insert("a".to_string(), json!(99));
let result = collect_input(Some(&path), kwargs, false).unwrap();
assert_eq!(result.get("a"), Some(&json!(99)), "CLI must override file");
assert_eq!(result.get("b"), Some(&json!(2)));
}
#[test]
fn test_collect_input_file_path_missing_returns_error() {
let err =
collect_input(Some("/nonexistent/path/data.json"), HashMap::new(), false).unwrap_err();
assert!(matches!(err, CliError::StdinRead(_)));
}
fn make_module_descriptor(
name: &str,
description: &str,
schema: Option<serde_json::Value>,
) -> apcore::registry::registry::ModuleDescriptor {
apcore::registry::registry::ModuleDescriptor {
module_id: name.to_string(),
name: None,
description: description.to_string(),
documentation: None,
input_schema: schema.unwrap_or(serde_json::Value::Null),
output_schema: serde_json::Value::Object(Default::default()),
version: "1.0.0".to_string(),
tags: vec![],
annotations: Some(apcore::module::ModuleAnnotations::default()),
examples: vec![],
metadata: std::collections::HashMap::new(),
display: None,
sunset_date: None,
dependencies: vec![],
enabled: true,
}
}
#[test]
fn test_build_module_command_name_is_set() {
let module = make_module_descriptor("math.add", "Add two numbers", None);
let cmd = build_module_command(&module).unwrap();
assert_eq!(cmd.get_name(), "math.add");
}
#[test]
fn test_build_module_command_has_input_flag() {
let module = make_module_descriptor("a.b", "desc", None);
let cmd = build_module_command(&module).unwrap();
let names: Vec<&str> = cmd.get_opts().filter_map(|a| a.get_long()).collect();
assert!(names.contains(&"input"), "must have --input flag");
}
#[test]
fn test_build_module_command_has_yes_flag() {
let module = make_module_descriptor("a.b", "desc", None);
let cmd = build_module_command(&module).unwrap();
let names: Vec<&str> = cmd.get_opts().filter_map(|a| a.get_long()).collect();
assert!(names.contains(&"yes"), "must have --yes flag");
}
#[test]
fn test_build_module_command_has_large_input_flag() {
let module = make_module_descriptor("a.b", "desc", None);
let cmd = build_module_command(&module).unwrap();
let names: Vec<&str> = cmd.get_opts().filter_map(|a| a.get_long()).collect();
assert!(
names.contains(&"large-input"),
"must have --large-input flag"
);
}
#[test]
fn test_build_module_command_has_format_flag() {
let module = make_module_descriptor("a.b", "desc", None);
let cmd = build_module_command(&module).unwrap();
let names: Vec<&str> = cmd.get_opts().filter_map(|a| a.get_long()).collect();
assert!(names.contains(&"format"), "must have --format flag");
}
#[test]
fn test_build_module_command_has_sandbox_flag() {
let module = make_module_descriptor("a.b", "desc", None);
let cmd = build_module_command(&module).unwrap();
let names: Vec<&str> = cmd.get_opts().filter_map(|a| a.get_long()).collect();
assert!(names.contains(&"sandbox"), "must have --sandbox flag");
}
#[test]
fn test_build_module_command_reserved_name_returns_error() {
for reserved in crate::builtin_group::RESERVED_GROUP_NAMES {
let module = make_module_descriptor(reserved, "desc", None);
let result = build_module_command(&module);
assert!(
matches!(result, Err(CliError::ReservedModuleId(_))),
"expected ReservedModuleId for '{reserved}', got {result:?}"
);
}
}
#[test]
fn test_build_module_command_former_builtin_names_allowed() {
for name in &["list", "describe", "exec", "init", "health", "config"] {
let module = make_module_descriptor(name, "desc", None);
let result = build_module_command(&module);
assert!(
result.is_ok(),
"former built-in '{name}' should no longer be reserved; got {result:?}"
);
}
}
#[test]
fn test_build_module_command_yes_has_short_flag() {
let module = make_module_descriptor("a.b", "desc", None);
let cmd = build_module_command(&module).unwrap();
let has_short_y = cmd
.get_opts()
.filter(|a| a.get_long() == Some("yes"))
.any(|a| a.get_short() == Some('y'));
assert!(has_short_y, "--yes must have short flag -y");
}
#[test]
fn test_reserved_group_names_single_entry() {
assert_eq!(crate::builtin_group::RESERVED_GROUP_NAMES, &["apcli"]);
}
#[test]
fn test_apcli_subcommand_names_matches_spec() {
let expected: &[&str] = &[
"list",
"describe",
"exec",
"validate",
"init",
"health",
"usage",
"enable",
"disable",
"reload",
"config",
"completion",
"describe-pipeline",
];
assert_eq!(crate::builtin_group::APCLI_SUBCOMMAND_NAMES, expected);
}
#[test]
fn test_map_error_module_not_found_is_44() {
assert_eq!(map_apcore_error_to_exit_code("MODULE_NOT_FOUND"), 44);
}
#[test]
fn test_map_error_module_load_error_is_44() {
assert_eq!(map_apcore_error_to_exit_code("MODULE_LOAD_ERROR"), 44);
}
#[test]
fn test_map_error_module_disabled_is_44() {
assert_eq!(map_apcore_error_to_exit_code("MODULE_DISABLED"), 44);
}
#[test]
fn test_map_error_schema_validation_error_is_45() {
assert_eq!(map_apcore_error_to_exit_code("SCHEMA_VALIDATION_ERROR"), 45);
}
#[test]
fn test_map_error_approval_denied_is_46() {
assert_eq!(map_apcore_error_to_exit_code("APPROVAL_DENIED"), 46);
}
#[test]
fn test_map_error_approval_timeout_is_46() {
assert_eq!(map_apcore_error_to_exit_code("APPROVAL_TIMEOUT"), 46);
}
#[test]
fn test_map_error_approval_pending_is_46() {
assert_eq!(map_apcore_error_to_exit_code("APPROVAL_PENDING"), 46);
}
#[test]
fn test_map_error_config_not_found_is_47() {
assert_eq!(map_apcore_error_to_exit_code("CONFIG_NOT_FOUND"), 47);
}
#[test]
fn test_map_error_config_invalid_is_47() {
assert_eq!(map_apcore_error_to_exit_code("CONFIG_INVALID"), 47);
}
#[test]
fn test_map_error_schema_circular_ref_is_48() {
assert_eq!(map_apcore_error_to_exit_code("SCHEMA_CIRCULAR_REF"), 48);
}
#[test]
fn test_map_error_acl_denied_is_77() {
assert_eq!(map_apcore_error_to_exit_code("ACL_DENIED"), 77);
}
#[test]
fn test_map_error_module_execute_error_is_1() {
assert_eq!(map_apcore_error_to_exit_code("MODULE_EXECUTE_ERROR"), 1);
}
#[test]
fn test_map_error_module_timeout_is_1() {
assert_eq!(map_apcore_error_to_exit_code("MODULE_TIMEOUT"), 1);
}
#[test]
fn test_map_error_unknown_is_1() {
assert_eq!(map_apcore_error_to_exit_code("SOMETHING_UNEXPECTED"), 1);
}
#[test]
fn test_map_error_empty_string_is_1() {
assert_eq!(map_apcore_error_to_exit_code(""), 1);
}
#[test]
fn test_set_audit_logger_none_clears_logger() {
set_audit_logger(None);
let guard = AUDIT_LOGGER.lock().unwrap();
assert!(guard.is_none(), "setting None must clear the audit logger");
}
#[test]
fn test_set_audit_logger_some_stores_logger() {
use crate::security::AuditLogger;
set_audit_logger(Some(AuditLogger::new(None)));
let guard = AUDIT_LOGGER.lock().unwrap();
assert!(guard.is_some(), "setting Some must store the audit logger");
drop(guard);
set_audit_logger(None);
}
#[test]
fn test_validate_against_schema_passes_with_no_properties() {
let schema = serde_json::json!({});
let input = std::collections::HashMap::new();
let result = validate_against_schema(&input, &schema);
assert!(result.is_ok(), "empty schema must pass: {result:?}");
}
#[test]
fn test_validate_against_schema_required_field_missing_fails() {
let schema = serde_json::json!({
"properties": {
"a": {"type": "integer"}
},
"required": ["a"]
});
let input: std::collections::HashMap<String, serde_json::Value> =
std::collections::HashMap::new();
let result = validate_against_schema(&input, &schema);
assert!(result.is_err(), "missing required field must fail");
}
#[test]
fn test_validate_against_schema_required_field_present_passes() {
let schema = serde_json::json!({
"properties": {
"a": {"type": "integer"}
},
"required": ["a"]
});
let mut input = std::collections::HashMap::new();
input.insert("a".to_string(), serde_json::json!(42));
let result = validate_against_schema(&input, &schema);
assert!(
result.is_ok(),
"present required field must pass: {result:?}"
);
}
#[test]
fn test_validate_against_schema_no_required_any_input_passes() {
let schema = serde_json::json!({
"properties": {
"x": {"type": "string"}
}
});
let input: std::collections::HashMap<String, serde_json::Value> =
std::collections::HashMap::new();
let result = validate_against_schema(&input, &schema);
assert!(result.is_ok(), "no required fields: empty input must pass");
}
#[test]
fn test_validate_against_schema_type_mismatch_fails() {
let schema = serde_json::json!({
"properties": {
"port": {"type": "integer"}
},
"required": ["port"]
});
let mut input = std::collections::HashMap::new();
input.insert("port".to_string(), serde_json::json!("not_a_number"));
let result = validate_against_schema(&input, &schema);
assert!(result.is_err(), "type mismatch must fail validation");
}
#[test]
fn test_validate_against_schema_enum_violation_fails() {
let schema = serde_json::json!({
"properties": {
"mode": {"type": "string", "enum": ["read", "write"]}
},
"required": ["mode"]
});
let mut input = std::collections::HashMap::new();
input.insert("mode".to_string(), serde_json::json!("delete"));
let result = validate_against_schema(&input, &schema);
assert!(result.is_err(), "enum violation must fail validation");
}
}