srill 1.1.0

Subscribe Redis and Invoke Lambda with cargo lambda, for Local development.
Documentation
use clap::Parser;
use std::collections::HashMap;

mod invoke;
mod sub;

#[derive(Debug, clap::Parser)]
#[clap(
    name = env!("CARGO_PKG_NAME"),
    version = env!("CARGO_PKG_VERSION"),
    author = env!("CARGO_PKG_AUTHORS"),
    about = env!("CARGO_PKG_DESCRIPTION"),
    arg_required_else_help = true,
)]
struct Args {
    /// Redis URL
    #[clap(long, value_parser, default_value = "redis://localhost:6379")]
    redis_url: String,
    /// Port used by cargo lambda invoke/watch
    #[clap(long, value_parser)]
    invoke_port: Option<u16>,
    /// Channel-Lambda pairs in format "channel1=lambda1,channel2=lambda2"
    #[clap(long, value_parser, value_delimiter = ',')]
    channels: Option<Vec<String>>,
    /// Config file path (TOML format)
    #[clap(long, value_parser)]
    config: Option<String>,
    /// Redis pub/sub channel name (legacy single channel mode)
    #[clap(value_parser)]
    channel: Option<String>,
    /// Lambda function name (legacy single channel mode)
    #[clap(value_parser)]
    lambda: Option<String>,
}

#[derive(Debug, Clone)]
struct ChannelLambdaPair {
    channel: String,
    lambda: String,
}

#[derive(Debug, serde::Deserialize)]
struct Config {
    redis_url: Option<String>,
    invoke_port: Option<u16>,
    channels: HashMap<String, String>,
}

#[derive(Debug, Clone)]
struct ResolvedConfig {
    pairs: Vec<ChannelLambdaPair>,
    redis_url: String,
    invoke_port: u16,
}

fn parse_channel_pairs(channels: &[String]) -> anyhow::Result<Vec<ChannelLambdaPair>> {
    let mut pairs = Vec::new();
    for channel_pair in channels {
        let parts: Vec<&str> = channel_pair.split('=').collect();
        if parts.len() != 2 {
            return Err(anyhow::anyhow!(
                "Invalid channel-lambda pair format: '{}'. Expected 'channel=lambda'",
                channel_pair
            ));
        }
        pairs.push(ChannelLambdaPair {
            channel: parts[0].to_string(),
            lambda: parts[1].to_string(),
        });
    }
    Ok(pairs)
}

fn load_config(config_path: &str) -> anyhow::Result<Config> {
    let content = std::fs::read_to_string(config_path)?;
    let config: Config = toml::from_str(&content)?;
    Ok(config)
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let args = Args::parse();

    // Keep transport settings separate from channel mappings so callers can
    // override a checked-in config file with a one-off local watch port.
    let resolved = get_channel_lambda_pairs(&args).await?;
    let pairs = resolved.pairs;
    let redis_url = resolved.redis_url;
    let invoke_port = resolved.invoke_port;

    if pairs.is_empty() {
        return Err(anyhow::anyhow!(
            "No channel-lambda pairs specified. Use --channels, --config, or legacy channel/lambda arguments."
        ));
    }

    println!("Starting srill with {} channel(s):", pairs.len());
    println!("Using invoke port: {}", invoke_port);
    for pair in &pairs {
        println!("  {} => {}", pair.channel, pair.lambda);
    }

    // Start subscription tasks for each channel-lambda pair
    let mut tasks = Vec::new();

    for pair in pairs {
        let redis_url = redis_url.clone();
        let task = tokio::spawn(async move {
            let prompt = format!("[srill {} ==> {}]", &pair.channel, &pair.lambda);

            if let Err(e) =
                sub::subscribe(&redis_url, &pair.channel, move |body| match invoke::invoke(
                    &pair.lambda,
                    body,
                    invoke_port,
                ) {
                    Ok(result) => {
                        if result.success {
                            println!("{prompt} success");
                        } else {
                            println!("{prompt} failed");
                        }
                    }
                    Err(e) => {
                        eprintln!("{prompt} Failed to invoke lambda: {e}");
                    }
                })
            {
                eprintln!("Failed to subscribe to channel {}: {}", pair.channel, e);
            }
        });
        tasks.push(task);
    }

    // Wait for all tasks to complete (they should run indefinitely)
    for task in tasks {
        if let Err(e) = task.await {
            eprintln!("Task failed: {e}");
        }
    }

    Ok(())
}

async fn get_channel_lambda_pairs(args: &Args) -> anyhow::Result<ResolvedConfig> {
    let mut pairs = Vec::new();
    let mut redis_url = args.redis_url.clone();
    let mut invoke_port = 9000;

    // Priority 1: Config file
    if let Some(config_path) = &args.config {
        let config = load_config(config_path)?;

        // File-backed defaults make local setups reproducible, but we still let
        // explicit CLI flags win for ad-hoc port changes during development.
        if let Some(config_redis_url) = config.redis_url {
            redis_url = config_redis_url;
        }
        if let Some(config_invoke_port) = config.invoke_port {
            invoke_port = config_invoke_port;
        }

        for (channel, lambda) in config.channels {
            pairs.push(ChannelLambdaPair { channel, lambda });
        }
    } else if let Some(channels) = &args.channels {
        // `--channels` is mutually exclusive with config file channel mappings,
        // but transport flags should remain independently overrideable.
        pairs = parse_channel_pairs(channels)?;
    } else if let (Some(channel), Some(lambda)) = (&args.channel, &args.lambda) {
        pairs.push(ChannelLambdaPair {
            channel: channel.clone(),
            lambda: lambda.clone(),
        });
    }

    if let Some(cli_invoke_port) = args.invoke_port {
        invoke_port = cli_invoke_port;
    }

    Ok(ResolvedConfig {
        pairs,
        redis_url,
        invoke_port,
    })
}