agent-first-pay 0.7.0

A payment tool for AI agents — send and receive across five networks through one interface, with spending limits you control.
Documentation
use crate::args::CliRequest;
use crate::config;
use crate::handler::{self, App};
use crate::output_fmt;
#[cfg(feature = "rpc")]
use crate::provider::remote;
use crate::store;
use crate::types::*;
use agent_first_data::OutputFormat;
use std::io::Write as _;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::sync::mpsc;

const OUTPUT_CHANNEL_CAPACITY: usize = 4096;

pub(super) async fn run(req: CliRequest) {
    let CliRequest {
        input,
        output: output_format,
        log,
        data_dir,
        rpc_endpoint: _,
        rpc_secret: _,
        startup_argv,
        startup_args,
        startup_requested,
        dry_run,
    } = req;

    if dry_run {
        let params = serde_json::to_value(&input).unwrap_or(serde_json::Value::Null);
        let command = params
            .get("code")
            .and_then(|v| v.as_str())
            .unwrap_or("unknown")
            .to_string();
        let dry = Output::DryRun {
            id: request_id_for_tracking(&input).map(str::to_string),
            command,
            params,
            trace: Trace::from_duration(0),
        };
        emit_output(&dry, output_format);
        return;
    }

    let resolved_dir = data_dir.unwrap_or_else(|| RuntimeConfig::default().data_dir);
    let mut config = match RuntimeConfig::load_from_dir(&resolved_dir) {
        Ok(config) => config,
        Err(error) => {
            emit_cli_error(&error, output_format);
            std::process::exit(1);
        }
    };
    if !log.is_empty() {
        config.log = log.clone();
    }

    let (tx, mut rx) = mpsc::channel::<Output>(OUTPUT_CHANNEL_CAPACITY);
    let store = store::create_storage_backend(&config);
    let app = Arc::new(App::new(config, tx, None, store));

    let cfg = app.config.read().await;
    if let Some(event) = config::maybe_startup_log(
        &log,
        startup_requested,
        Some(startup_argv),
        Some(&*cfg),
        startup_args,
    ) {
        emit_output(&event, output_format);
    }
    drop(cfg);

    app.requests_total.fetch_add(1, Ordering::Relaxed);
    handler::dispatch(&app, input).await;

    drop(app);

    let mut had_error = false;
    while let Some(out) = rx.recv().await {
        if matches!(out, Output::Error { .. }) {
            had_error = true;
        }
        if let Output::Log { ref event, .. } = out {
            if !log_event_enabled(&log, event) {
                continue;
            }
        }
        emit_output(&out, output_format);
    }

    std::process::exit(if had_error { 1 } else { 0 });
}

#[cfg(feature = "rpc")]
pub(super) async fn run_remote(req: CliRequest) {
    let resolved_dir = req
        .data_dir
        .as_deref()
        .map(ToString::to_string)
        .unwrap_or_else(|| RuntimeConfig::default().data_dir);
    let config = RuntimeConfig::load_from_dir(&resolved_dir).ok();

    if let Some(event) = config::maybe_startup_log(
        &req.log,
        req.startup_requested,
        Some(req.startup_argv.clone()),
        config.as_ref(),
        req.startup_args.clone(),
    ) {
        emit_output(&event, req.output);
    }

    let (endpoint, secret) = remote::require_remote_args(
        req.rpc_endpoint.as_deref(),
        req.rpc_secret.as_deref(),
        req.output,
    );

    let mut outputs = remote::rpc_call(endpoint, secret, &req.input).await;
    remote::wrap_remote_limit_topology(&mut outputs, endpoint);
    let had_error = remote::emit_remote_outputs(&outputs, req.output, &req.log);
    std::process::exit(if had_error { 1 } else { 0 });
}

pub(super) fn emit_cli_error(msg: &str, format: OutputFormat) {
    emit_cli_error_hint(msg, None, format);
}

pub(super) fn emit_cli_error_hint(msg: &str, hint: Option<&str>, format: OutputFormat) {
    let value = agent_first_data::build_cli_error(msg, hint);
    let rendered = agent_first_data::cli_output(&value, format);
    let _ = writeln!(std::io::stdout(), "{rendered}");
}

pub(super) fn log_event_enabled(log: &[String], event: &str) -> bool {
    if log.is_empty() {
        return false;
    }
    let ev = event.to_ascii_lowercase();
    log.iter()
        .any(|f| f == "*" || f == "all" || ev.starts_with(f.as_str()))
}

pub(super) fn emit_output(out: &Output, format: OutputFormat) {
    let value = serde_json::to_value(out).unwrap_or(serde_json::Value::Null);
    let rendered = output_fmt::render_value_with_policy(&value, format);
    let _ = writeln!(std::io::stdout(), "{rendered}");
}

pub(super) fn request_id_for_tracking(input: &Input) -> Option<&str> {
    match input {
        Input::WalletCreate { id, .. }
        | Input::LnWalletCreate { id, .. }
        | Input::WalletClose { id, .. }
        | Input::WalletList { id, .. }
        | Input::Balance { id, .. }
        | Input::Receive { id, .. }
        | Input::ReceiveClaim { id, .. }
        | Input::CashuSend { id, .. }
        | Input::CashuReceive { id, .. }
        | Input::Send { id, .. }
        | Input::Restore { id, .. }
        | Input::WalletShowSeed { id, .. }
        | Input::HistoryList { id, .. }
        | Input::HistoryStatus { id, .. }
        | Input::HistoryUpdate { id, .. }
        | Input::LimitAdd { id, .. }
        | Input::LimitRemove { id, .. }
        | Input::LimitList { id, .. }
        | Input::LimitSet { id, .. }
        | Input::WalletConfigShow { id, .. }
        | Input::WalletConfigSet { id, .. }
        | Input::WalletConfigTokenAdd { id, .. }
        | Input::WalletConfigTokenRemove { id, .. } => Some(id.as_str()),
        Input::Config(_) | Input::ConfigShow { .. } | Input::Version | Input::Close => None,
    }
}