use clap::{Args, Subcommand};
use forge_core::error::Result;
#[derive(Args)]
pub struct WebhookCommand {
#[command(subcommand)]
command: WebhookSubcommand,
}
#[derive(Subcommand)]
enum WebhookSubcommand {
Replay(ReplayArgs),
List(ListArgs),
}
#[derive(Args)]
struct ReplayArgs {
webhook_name: String,
idempotency_key: String,
#[arg(long, default_value = "http://localhost:3000")]
base_url: String,
}
#[derive(Args)]
struct ListArgs {
#[arg(long)]
name: Option<String>,
#[arg(long)]
status: Option<String>,
#[arg(long, default_value = "20")]
limit: i64,
}
impl WebhookCommand {
pub async fn execute(self) -> Result<()> {
match self.command {
WebhookSubcommand::Replay(args) => replay(args).await,
WebhookSubcommand::List(args) => list(args).await,
}
}
}
#[allow(clippy::disallowed_methods)]
async fn replay(args: ReplayArgs) -> Result<()> {
let config = forge_core::config::ForgeConfig::from_file("forge.toml")?;
let pool = sqlx::PgPool::connect(&config.database.url)
.await
.map_err(forge_core::ForgeError::Database)?;
let row: Option<(Option<Vec<u8>>, Option<serde_json::Value>, String)> = sqlx::query_as(
"SELECT raw_body, raw_headers, status \
FROM forge_webhook_events \
WHERE webhook_name = $1 AND idempotency_key = $2",
)
.bind(&args.webhook_name)
.bind(&args.idempotency_key)
.fetch_optional(&pool)
.await
.map_err(forge_core::ForgeError::Database)?;
let (raw_body, raw_headers, status) = match row {
Some(r) => r,
None => {
println!(
"No webhook event found for {} / {}",
args.webhook_name, args.idempotency_key
);
return Ok(());
}
};
let body = match raw_body {
Some(b) => b,
None => {
println!(
"Webhook event found (status: {status}) but raw body was not stored. \
Replay data is only available for events captured by a runtime that \
stores raw_body/raw_headers."
);
return Ok(());
}
};
println!(
"Replaying {} / {} (status: {}, {} bytes)",
args.webhook_name,
args.idempotency_key,
status,
body.len()
);
sqlx::query(
"DELETE FROM forge_webhook_events \
WHERE webhook_name = $1 AND idempotency_key = $2",
)
.bind(&args.webhook_name)
.bind(&args.idempotency_key)
.execute(&pool)
.await
.map_err(forge_core::ForgeError::Database)?;
let client = reqwest::Client::new();
let mut request = client.post(format!(
"{}/webhooks/{}",
args.base_url.trim_end_matches('/'),
args.webhook_name
));
if let Some(headers) = raw_headers
&& let Some(obj) = headers.as_object()
{
for (key, value) in obj {
if let Some(v) = value.as_str()
&& let Ok(name) = reqwest::header::HeaderName::from_bytes(key.as_bytes())
&& let Ok(val) = reqwest::header::HeaderValue::from_str(v)
{
request = request.header(name, val);
}
}
}
let response =
request.body(body).send().await.map_err(|e| {
forge_core::ForgeError::internal_with("Failed to send replay request", e)
})?;
let status_code = response.status();
let reason = status_code.canonical_reason().unwrap_or_default();
println!("Response: {} {}", status_code.as_u16(), reason);
let body = response
.text()
.await
.map_err(|e| forge_core::ForgeError::internal_with("Failed to read response body", e))?;
if !body.is_empty() {
println!("{body}");
}
Ok(())
}
#[allow(clippy::disallowed_methods)]
async fn list(args: ListArgs) -> Result<()> {
let config = forge_core::config::ForgeConfig::from_file("forge.toml")?;
let pool = sqlx::PgPool::connect(&config.database.url)
.await
.map_err(forge_core::ForgeError::Database)?;
let rows: Vec<(String, String, String, chrono::DateTime<chrono::Utc>, bool)> =
if let Some(ref name) = args.name {
sqlx::query_as(
"SELECT webhook_name, idempotency_key, status, processed_at, \
raw_body IS NOT NULL as has_body \
FROM forge_webhook_events \
WHERE webhook_name = $1 \
ORDER BY processed_at DESC \
LIMIT $2",
)
.bind(name)
.bind(args.limit)
.fetch_all(&pool)
.await
.map_err(forge_core::ForgeError::Database)?
} else {
sqlx::query_as(
"SELECT webhook_name, idempotency_key, status, processed_at, \
raw_body IS NOT NULL as has_body \
FROM forge_webhook_events \
ORDER BY processed_at DESC \
LIMIT $1",
)
.bind(args.limit)
.fetch_all(&pool)
.await
.map_err(forge_core::ForgeError::Database)?
};
if rows.is_empty() {
println!("No webhook events found.");
return Ok(());
}
println!(
"{:<20} {:<30} {:<10} {:<24} REPLAY",
"WEBHOOK", "IDEMPOTENCY_KEY", "STATUS", "PROCESSED_AT"
);
for (webhook, key, status, processed_at, has_body) in &rows {
let replay = if *has_body { "yes" } else { "no" };
println!(
"{:<20} {:<30} {:<10} {:<24} {}",
webhook,
if key.len() > 28 {
key.get(..28).unwrap_or_default()
} else {
key.as_str()
},
status,
processed_at.format("%Y-%m-%d %H:%M:%S UTC"),
replay
);
}
Ok(())
}