use clap::{Parser, Subcommand};
use pgqrs::config::Config;
use pgqrs::types::{QueueMessage, QueueRecord};
use std::fs::File;
use std::process;
mod output;
use crate::output::{JsonOutputWriter, OutputWriter, TableOutputWriter};
#[derive(Parser)]
#[command(name = "pgqrs")]
#[command(about = "A PostgreSQL-backed job queue CLI")]
#[command(version)]
struct Cli {
#[command(subcommand)]
command: Commands,
#[arg(long, short = 'd')]
dsn: Option<String>,
#[arg(long, short = 's')]
schema: Option<String>,
#[arg(long, short = 'c')]
config: Option<String>,
#[arg(long, default_value = "stderr")]
log_dest: String,
#[arg(long, default_value = "info")]
log_level: String,
#[arg(long, default_value = "table")]
format: String,
#[arg(long, default_value = "stdout")]
out: String,
}
#[derive(Subcommand)]
enum Commands {
Admin {
#[command(subcommand)]
admin_command: AdminCommands,
},
Queue {
#[command(subcommand)]
queue_command: QueueCommands,
},
Worker {
#[command(subcommand)]
worker_command: WorkerCommands,
},
}
#[derive(Subcommand)]
pub enum AdminCommands {
Install,
Verify,
Stats,
Reclaim {
#[arg(long, short = 'q')]
queue: String,
#[arg(long)]
older_than: Option<String>,
},
}
#[derive(Subcommand)]
pub enum QueueCommands {
Create {
name: String,
},
List,
Get {
name: String,
},
Messages {
name: String,
},
ArchiveDlq,
Delete {
name: String,
},
Purge {
name: String,
},
Metrics {
name: Option<String>,
},
}
#[derive(Subcommand)]
pub enum WorkerCommands {
List {
#[arg(long, short = 'q')]
queue: Option<String>,
},
Get {
id: i64,
},
Messages {
id: i64,
},
ReleaseMessages {
id: i64,
},
Suspend {
id: i64,
},
Resume {
id: i64,
},
Shutdown {
id: i64,
},
Purge {
#[arg(long, default_value = "7d")]
older_than: String,
},
Delete {
id: i64,
},
Heartbeat {
id: i64,
},
Stats {
queue: String,
},
Health {
#[arg(long, default_value = "60")]
timeout: u64,
#[arg(long)]
group_by_queue: bool,
},
}
#[tokio::main]
async fn main() {
let cli = Cli::parse();
let level = match cli.log_level.to_lowercase().as_str() {
"error" => tracing::Level::ERROR,
"warn" => tracing::Level::WARN,
"info" => tracing::Level::INFO,
"debug" => tracing::Level::DEBUG,
"trace" => tracing::Level::TRACE,
other => {
eprintln!("Unknown log level '{}', defaulting to INFO", other);
tracing::Level::INFO
}
};
let writer: Box<dyn Fn() -> Box<dyn std::io::Write + Send> + Send + Sync> =
if cli.log_dest == "stderr" {
Box::new(|| Box::new(std::io::stderr()))
} else {
let file = std::fs::File::create(&cli.log_dest).expect("Failed to create log file");
Box::new(move || Box::new(file.try_clone().expect("Failed to clone log file")))
};
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(level)
.with_writer(writer)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
if let Err(e) = run_cli(cli).await {
tracing::error!("Error: {}", e);
process::exit(1);
}
}
async fn run_cli(cli: Cli) -> anyhow::Result<()> {
let config = Config::load_with_schema_options(cli.dsn, cli.schema, cli.config)
.map_err(|e| anyhow::anyhow!("Failed to load configuration: {}", e))?;
#[cfg(feature = "s3")]
let mut store = pgqrs::connect_with_config(&config).await?;
#[cfg(not(feature = "s3"))]
let store = pgqrs::connect_with_config(&config).await?;
#[cfg(feature = "s3")]
if let pgqrs::store::AnyStore::S3(s3_store) = &mut store {
s3_store.snapshot().await?;
}
let writer = match cli.format.to_lowercase().as_str() {
"json" => OutputWriter::Json(JsonOutputWriter),
_ => OutputWriter::Table(TableOutputWriter),
};
let mut out_writer: Box<dyn std::io::Write> = match cli.out.as_str() {
"stdout" => Box::new(std::io::stdout()),
_ => Box::new(File::create(&cli.out)?),
};
let out: &mut dyn std::io::Write = out_writer.as_mut();
match cli.command {
Commands::Admin { admin_command } => {
handle_admin_commands(&store, admin_command, writer, out).await?
}
Commands::Worker { worker_command } => {
handle_worker_commands(&store, worker_command, writer, out).await?
}
Commands::Queue { queue_command } => {
handle_queue_commands(&store, queue_command, writer, out).await?
}
}
Ok(())
}
pub async fn handle_admin_commands(
store: &impl pgqrs::store::Store,
command: AdminCommands,
writer: OutputWriter,
out: &mut dyn std::io::Write,
) -> anyhow::Result<()> {
match command {
AdminCommands::Install => {
tracing::info!("Initializing pgqrs schema ...");
store.bootstrap().await?;
tracing::info!("Initialization completed successfully");
}
AdminCommands::Verify => {
tracing::info!("Verifying pgqrs installation...");
pgqrs::admin(store).verify().await?;
tracing::info!("Verification completed successfully");
}
AdminCommands::Stats => {
tracing::info!("Getting system statistics...");
let stats = pgqrs::admin(store).system_stats().await?;
writer.write_item(&stats, out)?;
}
AdminCommands::Reclaim { queue, older_than } => {
tracing::info!("Reclaiming messages for queue '{}'...", queue);
let queue_info = pgqrs::tables(store).queues().get_by_name(&queue).await?;
let duration = match older_than {
Some(s) => Some(
chrono::Duration::from_std(
s.parse::<humantime::Duration>()
.map_err(|e| anyhow::anyhow!("Invalid duration format '{}': {}", s, e))?
.into(),
)
.map_err(|e| anyhow::anyhow!("Duration too large: {}", e))?,
),
None => None,
};
let count = pgqrs::admin(store)
.reclaim_messages(queue_info.id, duration)
.await?;
tracing::info!("Reclaimed {} messages from zombie workers", count);
writeln!(out, "Reclaimed {} messages from zombie workers", count)?;
}
}
Ok(())
}
pub async fn handle_queue_commands(
store: &impl pgqrs::store::Store,
command: QueueCommands,
writer: OutputWriter,
out: &mut dyn std::io::Write,
) -> anyhow::Result<()> {
match command {
QueueCommands::Create { name } => {
tracing::info!("Creating queue '{}' ...", &name);
let queue = store.queue(&name).await?;
writer.write_item(&queue, out)?;
}
QueueCommands::List => {
tracing::info!("Listing all queues...");
let queue_list: Vec<QueueRecord> = pgqrs::tables(store).queues().list().await?;
writer.write_list(&queue_list, out)?;
}
QueueCommands::Get { name } => {
tracing::info!("Getting queue '{}'...", name);
let queue_info = store.queues().get_by_name(&name).await?;
writer.write_item(&queue_info, out)?;
}
QueueCommands::Messages { name } => {
tracing::info!("Listing messages for queue '{}'...", name);
let queue_info = store.queues().get_by_name(&name).await?;
let messages_list: Vec<QueueMessage> = pgqrs::tables(store)
.messages()
.filter_by_fk(queue_info.id)
.await?;
writer.write_list(&messages_list, out)?;
}
QueueCommands::ArchiveDlq => {
tracing::info!("Moving dead letter queue messages to archive");
let moved_ids = pgqrs::admin(store).dlq().await?;
tracing::info!("Moved {} messages from DLQ to archive", moved_ids.len());
writer.write_list(&moved_ids, out)?;
}
QueueCommands::Delete { name } => {
tracing::info!("Deleting queue '{}'...", name);
let queue_info = store.queues().get_by_name(&name).await?;
pgqrs::admin(store).delete_queue(&queue_info).await?;
tracing::info!("Queue '{}' deleted successfully", name);
}
QueueCommands::Purge { name } => {
tracing::info!("Purging queue '{}'...", name);
pgqrs::admin(store).purge_queue(&name).await?;
tracing::info!("Queue '{}' purged successfully", name);
}
QueueCommands::Metrics { name } => {
if let Some(queue_name) = name {
tracing::info!("Getting metrics for queue '{}'...", queue_name);
let metrics = pgqrs::admin(store).queue_metrics(&queue_name).await?;
writer.write_item(&metrics, out)?;
} else {
tracing::info!("Getting metrics for all queues...");
let metrics = pgqrs::admin(store).all_queues_metrics().await?;
writer.write_list(&metrics, out)?;
}
}
}
Ok(())
}
pub async fn handle_worker_commands(
store: &impl pgqrs::store::Store,
command: WorkerCommands,
writer: OutputWriter,
out: &mut dyn std::io::Write,
) -> anyhow::Result<()> {
match command {
WorkerCommands::List { queue } => {
let workers = match queue {
Some(queue_name) => {
tracing::info!("Listing workers for queue '{}'...", queue_name);
let queue_id = pgqrs::tables(store)
.queues()
.get_by_name(&queue_name)
.await?
.id;
pgqrs::tables(store)
.workers()
.filter_by_fk(queue_id)
.await?
}
None => {
tracing::info!("Listing all workers...");
pgqrs::tables(store).workers().list().await?
}
};
tracing::info!("Found {} workers", workers.len());
writer.write_list(&workers, out)?;
}
WorkerCommands::Get { id } => {
let worker = pgqrs::tables(store)
.workers()
.get(id)
.await
.map_err(|_| anyhow::anyhow!("Worker with ID {} not found", id))?;
writer.write_item(&worker, out)?;
}
WorkerCommands::Messages { id } => {
let worker_info = pgqrs::tables(store)
.workers()
.get(id)
.await
.map_err(|_| anyhow::anyhow!("Worker with ID {} not found", id))?;
tracing::info!("Getting messages for worker {}...", id);
let messages = pgqrs::admin(store)
.get_worker_messages(worker_info.id)
.await?;
tracing::info!("Found {} messages", messages.len());
writer.write_list(&messages, out)?;
}
WorkerCommands::ReleaseMessages { id } => {
tracing::info!("Releasing messages from worker {}...", id);
let released_count = pgqrs::admin(store).release_worker_messages(id).await?;
tracing::info!("Released {} messages", released_count);
writeln!(
out,
"Released {} messages from worker {}",
released_count, id
)?;
}
WorkerCommands::Suspend { id } => {
tracing::info!("Suspending worker {}...", id);
let worker_handler = store.worker(id).await?;
worker_handler.suspend().await?;
tracing::info!("Worker {} suspended", id);
writeln!(out, "Worker {} suspended", id)?;
}
WorkerCommands::Resume { id } => {
tracing::info!("Resuming worker {}...", id);
let worker_handler = store.worker(id).await?;
worker_handler.resume().await?;
tracing::info!("Worker {} resumed", id);
writeln!(out, "Worker {} resumed", id)?;
}
WorkerCommands::Shutdown { id } => {
tracing::info!("Shutting down worker {}...", id);
let worker_handler = store.worker(id).await?;
worker_handler.shutdown().await?;
tracing::info!("Worker {} shut down successfully", id);
writeln!(out, "Worker {} shut down successfully", id)?;
}
WorkerCommands::Heartbeat { id } => {
tracing::info!("Updating heartbeat for worker {}...", id);
let worker = store.worker(id).await?;
worker.heartbeat().await?;
tracing::info!("Heartbeat updated for worker {}", id);
writeln!(out, "Heartbeat updated for worker {}", id)?;
}
WorkerCommands::Purge { older_than } => {
let duration = chrono::Duration::from_std(
older_than
.parse::<humantime::Duration>()
.map_err(|e| {
anyhow::anyhow!("Invalid duration format '{}': {}", older_than, e)
})?
.into(),
)
.map_err(|e| anyhow::anyhow!("Duration too large: {}", e))?;
tracing::info!("Purging workers older than {:?}...", duration);
let purged_count = pgqrs::admin(store).purge_old_workers(duration).await?;
tracing::info!("Purged {} old workers", purged_count);
writeln!(out, "Purged {} old workers", purged_count)?;
}
WorkerCommands::Delete { id } => {
tracing::info!("Deleting worker {}...", id);
match pgqrs::admin(store).delete_worker(id).await {
Ok(deleted_count) => {
if deleted_count > 0 {
tracing::info!("Deleted worker {}", id);
writeln!(out, "Worker {} deleted successfully", id)?;
} else {
tracing::warn!("Worker {} not found", id);
writeln!(out, "Worker {} not found", id)?;
}
}
Err(e) => {
tracing::error!("Failed to delete worker {}: {}", id, e);
writeln!(out, "Error: {}", e)?;
}
}
}
WorkerCommands::Stats { queue } => {
tracing::info!("Getting worker statistics for queue '{}'...", queue);
let stats = pgqrs::admin(store).worker_stats(&queue).await?;
tracing::info!("Worker statistics retrieved");
writeln!(out, "Worker Statistics for Queue '{}':", queue)?;
writeln!(out, " Total Workers: {}", stats.total_workers)?;
writeln!(out, " Ready Workers: {}", stats.ready_workers)?;
writeln!(out, " Polling Workers: {}", stats.polling_workers)?;
writeln!(out, " Interrupted Workers: {}", stats.interrupted_workers)?;
writeln!(out, " Suspended Workers: {}", stats.suspended_workers)?;
writeln!(out, " Stopped Workers: {}", stats.stopped_workers)?;
writeln!(
out,
" Average Messages per Worker: {:.2}",
stats.average_messages_per_worker
)?;
writeln!(out, " Oldest Worker Age: {:?}", stats.oldest_worker_age)?;
writeln!(
out,
" Newest Heartbeat Age: {:?}",
stats.newest_heartbeat_age
)?;
}
WorkerCommands::Health {
timeout,
group_by_queue,
} => {
tracing::info!(
"Checking worker health (timeout: {}s, grouped: {})...",
timeout,
group_by_queue
);
let stats = pgqrs::admin(store)
.worker_health_stats(chrono::Duration::seconds(timeout as i64), group_by_queue)
.await?;
writer.write_list(&stats, out)?;
}
}
Ok(())
}