use clap::Parser;
use std::collections::HashMap;
mod invoke;
mod sub;
#[derive(Debug, clap::Parser)]
#[clap(
name = env!("CARGO_PKG_NAME"),
version = env!("CARGO_PKG_VERSION"),
author = env!("CARGO_PKG_AUTHORS"),
about = env!("CARGO_PKG_DESCRIPTION"),
arg_required_else_help = true,
)]
struct Args {
#[clap(long, value_parser, default_value = "redis://localhost:6379")]
redis_url: String,
#[clap(long, value_parser)]
invoke_port: Option<u16>,
#[clap(long, value_parser, value_delimiter = ',')]
channels: Option<Vec<String>>,
#[clap(long, value_parser)]
config: Option<String>,
#[clap(value_parser)]
channel: Option<String>,
#[clap(value_parser)]
lambda: Option<String>,
}
#[derive(Debug, Clone)]
struct ChannelLambdaPair {
channel: String,
lambda: String,
}
#[derive(Debug, serde::Deserialize)]
struct Config {
redis_url: Option<String>,
invoke_port: Option<u16>,
channels: HashMap<String, String>,
}
#[derive(Debug, Clone)]
struct ResolvedConfig {
pairs: Vec<ChannelLambdaPair>,
redis_url: String,
invoke_port: u16,
}
fn parse_channel_pairs(channels: &[String]) -> anyhow::Result<Vec<ChannelLambdaPair>> {
let mut pairs = Vec::new();
for channel_pair in channels {
let parts: Vec<&str> = channel_pair.split('=').collect();
if parts.len() != 2 {
return Err(anyhow::anyhow!(
"Invalid channel-lambda pair format: '{}'. Expected 'channel=lambda'",
channel_pair
));
}
pairs.push(ChannelLambdaPair {
channel: parts[0].to_string(),
lambda: parts[1].to_string(),
});
}
Ok(pairs)
}
fn load_config(config_path: &str) -> anyhow::Result<Config> {
let content = std::fs::read_to_string(config_path)?;
let config: Config = toml::from_str(&content)?;
Ok(config)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
let resolved = get_channel_lambda_pairs(&args).await?;
let pairs = resolved.pairs;
let redis_url = resolved.redis_url;
let invoke_port = resolved.invoke_port;
if pairs.is_empty() {
return Err(anyhow::anyhow!(
"No channel-lambda pairs specified. Use --channels, --config, or legacy channel/lambda arguments."
));
}
println!("Starting srill with {} channel(s):", pairs.len());
println!("Using invoke port: {}", invoke_port);
for pair in &pairs {
println!(" {} => {}", pair.channel, pair.lambda);
}
let mut tasks = Vec::new();
for pair in pairs {
let redis_url = redis_url.clone();
let task = tokio::spawn(async move {
let prompt = format!("[srill {} ==> {}]", &pair.channel, &pair.lambda);
if let Err(e) =
sub::subscribe(&redis_url, &pair.channel, move |body| match invoke::invoke(
&pair.lambda,
body,
invoke_port,
) {
Ok(result) => {
if result.success {
println!("{prompt} success");
} else {
println!("{prompt} failed");
}
}
Err(e) => {
eprintln!("{prompt} Failed to invoke lambda: {e}");
}
})
{
eprintln!("Failed to subscribe to channel {}: {}", pair.channel, e);
}
});
tasks.push(task);
}
for task in tasks {
if let Err(e) = task.await {
eprintln!("Task failed: {e}");
}
}
Ok(())
}
async fn get_channel_lambda_pairs(args: &Args) -> anyhow::Result<ResolvedConfig> {
let mut pairs = Vec::new();
let mut redis_url = args.redis_url.clone();
let mut invoke_port = 9000;
if let Some(config_path) = &args.config {
let config = load_config(config_path)?;
if let Some(config_redis_url) = config.redis_url {
redis_url = config_redis_url;
}
if let Some(config_invoke_port) = config.invoke_port {
invoke_port = config_invoke_port;
}
for (channel, lambda) in config.channels {
pairs.push(ChannelLambdaPair { channel, lambda });
}
} else if let Some(channels) = &args.channels {
pairs = parse_channel_pairs(channels)?;
} else if let (Some(channel), Some(lambda)) = (&args.channel, &args.lambda) {
pairs.push(ChannelLambdaPair {
channel: channel.clone(),
lambda: lambda.clone(),
});
}
if let Some(cli_invoke_port) = args.invoke_port {
invoke_port = cli_invoke_port;
}
Ok(ResolvedConfig {
pairs,
redis_url,
invoke_port,
})
}