use anyhow::Result;
use hmac::{Hmac, Mac};
use sha2::Sha256;
use subtle::ConstantTimeEq;
type HmacSha256 = Hmac<Sha256>;
pub fn verify_signature(
provider: &str,
secret: &str,
payload: &[u8],
headers: &axum::http::HeaderMap,
) -> Result<bool> {
match provider {
"stripe" => verify_stripe(secret, payload, headers),
"github" => verify_github(secret, payload, headers),
"shopify" => verify_shopify(secret, payload, headers),
"hmac" => verify_custom_hmac(secret, payload, headers),
"pagerduty" => verify_pagerduty(secret, payload, headers),
"grafana" => verify_grafana(secret, payload, headers),
"terraform" => verify_terraform(secret, payload, headers),
"gitlab" => verify_gitlab(secret, payload, headers),
"standard-webhooks" => verify_standard_webhooks(secret, payload, headers),
"linear" => verify_linear(secret, payload, headers),
"twilio" => verify_twilio(secret, payload, headers),
"paddle" => verify_paddle(secret, payload, headers),
_ => anyhow::bail!("Unknown verification provider: {provider}"),
}
}
fn verify_stripe(secret: &str, payload: &[u8], headers: &axum::http::HeaderMap) -> Result<bool> {
let sig_header = headers
.get("Stripe-Signature")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let mut timestamp = "";
let mut signature = "";
for part in sig_header.split(',') {
let part = part.trim();
if let Some(t) = part.strip_prefix("t=") {
timestamp = t;
} else if let Some(v) = part.strip_prefix("v1=") {
signature = v;
}
}
if timestamp.is_empty() || signature.is_empty() {
return Ok(false);
}
const TOLERANCE_SECS: i64 = 300;
if let Ok(ts) = timestamp.parse::<i64>() {
let now = chrono::Utc::now().timestamp();
if (now - ts).abs() > TOLERANCE_SECS {
tracing::warn!(
timestamp = ts,
now = now,
"Stripe signature timestamp too old or too far in the future"
);
return Ok(false);
}
} else {
return Ok(false);
}
let mut mac = HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC key length");
mac.update(timestamp.as_bytes());
mac.update(b".");
mac.update(payload);
let expected = hex::encode(mac.finalize().into_bytes());
Ok(constant_time_eq(&expected, signature))
}
fn verify_github(secret: &str, payload: &[u8], headers: &axum::http::HeaderMap) -> Result<bool> {
let sig_header = headers
.get("X-Hub-Signature-256")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let signature = sig_header.strip_prefix("sha256=").unwrap_or("");
if signature.is_empty() {
return Ok(false);
}
let expected = compute_hmac_sha256_hex(secret.as_bytes(), payload);
Ok(constant_time_eq(&expected, signature))
}
fn verify_shopify(secret: &str, payload: &[u8], headers: &axum::http::HeaderMap) -> Result<bool> {
let sig_header = headers
.get("X-Shopify-Hmac-SHA256")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if sig_header.is_empty() {
return Ok(false);
}
let expected = compute_hmac_sha256_base64(secret.as_bytes(), payload);
Ok(constant_time_eq(&expected, sig_header))
}
fn verify_custom_hmac(
secret: &str,
payload: &[u8],
headers: &axum::http::HeaderMap,
) -> Result<bool> {
let sig_header = headers
.get("X-Webhook-Signature")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if sig_header.is_empty() {
return Ok(false);
}
let expected = compute_hmac_sha256_hex(secret.as_bytes(), payload);
Ok(constant_time_eq(&expected, sig_header))
}
fn verify_pagerduty(secret: &str, payload: &[u8], headers: &axum::http::HeaderMap) -> Result<bool> {
let sig_header = headers
.get("X-PagerDuty-Signature")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let signature = sig_header.strip_prefix("v1=").unwrap_or("");
if signature.is_empty() {
return Ok(false);
}
let expected = compute_hmac_sha256_hex(secret.as_bytes(), payload);
Ok(constant_time_eq(&expected, signature))
}
fn verify_grafana(secret: &str, payload: &[u8], headers: &axum::http::HeaderMap) -> Result<bool> {
let sig_header = headers
.get("X-Grafana-Alerting-Signature")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if sig_header.is_empty() {
return Ok(false);
}
let expected = compute_hmac_sha256_hex(secret.as_bytes(), payload);
Ok(constant_time_eq(&expected, sig_header))
}
fn verify_terraform(secret: &str, payload: &[u8], headers: &axum::http::HeaderMap) -> Result<bool> {
let sig_header = headers
.get("X-TFE-Notification-Signature")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if sig_header.is_empty() {
return Ok(false);
}
let expected = compute_hmac_sha512_hex(secret.as_bytes(), payload);
Ok(constant_time_eq(&expected, sig_header))
}
fn verify_gitlab(secret: &str, _payload: &[u8], headers: &axum::http::HeaderMap) -> Result<bool> {
let token = headers
.get("X-Gitlab-Token")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if token.is_empty() {
return Ok(false);
}
Ok(constant_time_eq(secret, token))
}
fn verify_standard_webhooks(
secret: &str,
payload: &[u8],
headers: &axum::http::HeaderMap,
) -> Result<bool> {
use base64::Engine;
let msg_id = match headers.get("webhook-id").and_then(|v| v.to_str().ok()) {
Some(v) if !v.is_empty() => v,
_ => return Ok(false),
};
let timestamp_str = match headers
.get("webhook-timestamp")
.and_then(|v| v.to_str().ok())
{
Some(v) if !v.is_empty() => v,
_ => return Ok(false),
};
let sig_header = match headers
.get("webhook-signature")
.and_then(|v| v.to_str().ok())
{
Some(v) if !v.is_empty() => v,
_ => return Ok(false),
};
const TOLERANCE_SECS: i64 = 300;
let timestamp: i64 = match timestamp_str.parse() {
Ok(ts) => ts,
Err(_) => return Ok(false),
};
let now = chrono::Utc::now().timestamp();
if (now - timestamp).abs() > TOLERANCE_SECS {
tracing::warn!(
timestamp,
now,
"Standard Webhooks signature timestamp too old or too far in the future"
);
return Ok(false);
}
let key_bytes = if let Some(stripped) = secret.strip_prefix("whsec_") {
base64::engine::general_purpose::STANDARD
.decode(stripped)
.unwrap_or_else(|_| secret.as_bytes().to_vec())
} else {
secret.as_bytes().to_vec()
};
let mut mac = HmacSha256::new_from_slice(&key_bytes).expect("HMAC accepts any key length");
mac.update(msg_id.as_bytes());
mac.update(b".");
mac.update(timestamp_str.as_bytes());
mac.update(b".");
mac.update(payload);
let expected = base64::engine::general_purpose::STANDARD.encode(mac.finalize().into_bytes());
for sig_entry in sig_header.split(' ') {
if let Some(sig_b64) = sig_entry.strip_prefix("v1,") {
if constant_time_eq(&expected, sig_b64) {
return Ok(true);
}
}
}
Ok(false)
}
fn verify_linear(secret: &str, payload: &[u8], headers: &axum::http::HeaderMap) -> Result<bool> {
let sig_header = headers
.get("Linear-Signature")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if sig_header.is_empty() {
return Ok(false);
}
let expected = compute_hmac_sha256_hex(secret.as_bytes(), payload);
Ok(constant_time_eq(&expected, sig_header))
}
fn verify_twilio(secret: &str, payload: &[u8], headers: &axum::http::HeaderMap) -> Result<bool> {
use base64::Engine;
let sig_header = headers
.get("X-Twilio-Signature")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if sig_header.is_empty() {
return Ok(false);
}
type HmacSha1 = Hmac<sha1::Sha1>;
let mut mac =
HmacSha1::new_from_slice(secret.as_bytes()).expect("HMAC-SHA1 accepts any key length");
mac.update(payload);
let expected = base64::engine::general_purpose::STANDARD.encode(mac.finalize().into_bytes());
Ok(constant_time_eq(&expected, sig_header))
}
fn verify_paddle(secret: &str, payload: &[u8], headers: &axum::http::HeaderMap) -> Result<bool> {
let sig_header = headers
.get("Paddle-Signature")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let mut timestamp = "";
let mut signature = "";
for part in sig_header.split(';') {
let part = part.trim();
if let Some(t) = part.strip_prefix("ts=") {
timestamp = t;
} else if let Some(h) = part.strip_prefix("h1=") {
signature = h;
}
}
if timestamp.is_empty() || signature.is_empty() {
return Ok(false);
}
const TOLERANCE_SECS: i64 = 300;
if let Ok(ts) = timestamp.parse::<i64>() {
let now = chrono::Utc::now().timestamp();
if (now - ts).abs() > TOLERANCE_SECS {
tracing::warn!(
timestamp = ts,
now = now,
"Paddle signature timestamp too old or too far in the future"
);
return Ok(false);
}
} else {
return Ok(false);
}
let mut mac = HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC key length");
mac.update(timestamp.as_bytes());
mac.update(b":");
mac.update(payload);
let expected = hex::encode(mac.finalize().into_bytes());
Ok(constant_time_eq(&expected, signature))
}
fn compute_hmac_sha256_hex(key: &[u8], data: &[u8]) -> String {
let mut mac = HmacSha256::new_from_slice(key).expect("HMAC key length");
mac.update(data);
hex::encode(mac.finalize().into_bytes())
}
fn compute_hmac_sha512_hex(key: &[u8], data: &[u8]) -> String {
use hmac::Mac;
type HmacSha512 = Hmac<sha2::Sha512>;
let mut mac = HmacSha512::new_from_slice(key).expect("HMAC key length");
mac.update(data);
hex::encode(mac.finalize().into_bytes())
}
fn compute_hmac_sha256_base64(key: &[u8], data: &[u8]) -> String {
use base64::Engine;
let mut mac = HmacSha256::new_from_slice(key).expect("HMAC key length");
mac.update(data);
base64::engine::general_purpose::STANDARD.encode(mac.finalize().into_bytes())
}
fn constant_time_eq(a: &str, b: &str) -> bool {
if a.len() != b.len() {
return false;
}
a.as_bytes().ct_eq(b.as_bytes()).into()
}
use serde::Deserialize;
#[derive(Debug, Deserialize)]
pub struct SnsMessage {
#[serde(rename = "Type")]
pub message_type: String,
#[serde(rename = "MessageId")]
pub message_id: String,
#[serde(rename = "Message")]
pub message: String,
#[serde(rename = "Timestamp")]
pub timestamp: String,
#[serde(rename = "TopicArn")]
pub topic_arn: String,
#[serde(rename = "Signature")]
pub signature: String,
#[serde(rename = "SigningCertURL")]
pub signing_cert_url: String,
#[serde(rename = "SignatureVersion")]
pub signature_version: String,
#[serde(rename = "Subject", default)]
pub subject: Option<String>,
#[serde(rename = "SubscribeURL", default)]
pub subscribe_url: Option<String>,
#[serde(rename = "Token", default)]
pub token: Option<String>,
#[serde(rename = "UnsubscribeURL", default)]
pub unsubscribe_url: Option<String>,
}
use std::sync::Mutex;
use std::time::{Duration, Instant};
type CertCache = Mutex<HashMap<String, (Vec<u8>, Instant)>>;
static SNS_CERT_CACHE: std::sync::LazyLock<CertCache> =
std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
use std::collections::HashMap;
const SNS_CERT_CACHE_TTL: Duration = Duration::from_secs(3600);
fn get_cached_cert(url: &str) -> Option<Vec<u8>> {
let cache = SNS_CERT_CACHE.lock().unwrap_or_else(|e| e.into_inner());
if let Some((data, ts)) = cache.get(url) {
if ts.elapsed() < SNS_CERT_CACHE_TTL {
return Some(data.clone());
}
}
None
}
fn set_cached_cert(url: &str, data: Vec<u8>) {
let mut cache = SNS_CERT_CACHE.lock().unwrap_or_else(|e| e.into_inner());
cache.insert(url.to_string(), (data, Instant::now()));
}
pub async fn verify_sns_message(msg: &SnsMessage, http: &reqwest::Client) -> Result<bool> {
let cert_url = &msg.signing_cert_url;
if !is_valid_sns_url(cert_url) {
tracing::warn!(url = cert_url, "Invalid SNS SigningCertURL");
return Ok(false);
}
let pem_data = if let Some(cached) = get_cached_cert(cert_url) {
cached.into()
} else {
let data = http.get(cert_url).send().await?.bytes().await?;
set_cached_cert(cert_url, data.to_vec());
data
};
let (_, pem) = x509_parser::pem::parse_x509_pem(&pem_data)
.map_err(|e| anyhow::anyhow!("PEM parse error: {:?}", e))?;
let (_, cert) = x509_parser::parse_x509_certificate(&pem.contents)
.map_err(|e| anyhow::anyhow!("X.509 parse error: {:?}", e))?;
use pkcs1::DecodeRsaPublicKey;
let key_data = cert.tbs_certificate.subject_pki.subject_public_key.data;
let public_key = rsa::RsaPublicKey::from_pkcs1_der(&key_data)
.map_err(|e| anyhow::anyhow!("RSA key parse error: {e}"))?;
let string_to_sign = build_sns_string_to_sign(msg);
use base64::Engine;
let sig_bytes = base64::engine::general_purpose::STANDARD.decode(&msg.signature)?;
use rsa::pkcs1v15::{Signature, VerifyingKey};
use rsa::signature::Verifier;
let sig = Signature::try_from(sig_bytes.as_slice())
.map_err(|e| anyhow::anyhow!("Invalid signature: {e}"))?;
let valid = match msg.signature_version.as_str() {
"1" => {
let vk = VerifyingKey::<sha1::Sha1>::new(public_key);
vk.verify(string_to_sign.as_bytes(), &sig).is_ok()
}
"2" => {
let vk = VerifyingKey::<sha2::Sha256>::new(public_key);
vk.verify(string_to_sign.as_bytes(), &sig).is_ok()
}
_ => {
tracing::warn!(
version = msg.signature_version,
"Unknown SNS SignatureVersion"
);
false
}
};
Ok(valid)
}
pub fn is_valid_sns_url(url: &str) -> bool {
if !url.starts_with("https://sns.") {
return false;
}
if let Some(host_start) = url.strip_prefix("https://")
&& let Some(path_start) = host_start.find('/')
{
let host = &host_start[..path_start];
let parts: Vec<&str> = host.split('.').collect();
return parts.len() >= 3
&& parts[parts.len() - 1] == "com"
&& parts[parts.len() - 2] == "amazonaws";
}
false
}
pub fn build_sns_string_to_sign(msg: &SnsMessage) -> String {
let mut lines = String::with_capacity(512);
match msg.message_type.as_str() {
"Notification" => {
lines.push_str("Message\n");
lines.push_str(&msg.message);
lines.push('\n');
lines.push_str("MessageId\n");
lines.push_str(&msg.message_id);
lines.push('\n');
if let Some(ref subject) = msg.subject {
lines.push_str("Subject\n");
lines.push_str(subject);
lines.push('\n');
}
lines.push_str("Timestamp\n");
lines.push_str(&msg.timestamp);
lines.push('\n');
lines.push_str("TopicArn\n");
lines.push_str(&msg.topic_arn);
lines.push('\n');
lines.push_str("Type\n");
lines.push_str(&msg.message_type);
lines.push('\n');
}
"SubscriptionConfirmation" | "UnsubscribeConfirmation" => {
lines.push_str("Message\n");
lines.push_str(&msg.message);
lines.push('\n');
lines.push_str("MessageId\n");
lines.push_str(&msg.message_id);
lines.push('\n');
if let Some(ref url) = msg.subscribe_url {
lines.push_str("SubscribeURL\n");
lines.push_str(url);
lines.push('\n');
}
lines.push_str("Timestamp\n");
lines.push_str(&msg.timestamp);
lines.push('\n');
if let Some(ref token) = msg.token {
lines.push_str("Token\n");
lines.push_str(token);
lines.push('\n');
}
lines.push_str("TopicArn\n");
lines.push_str(&msg.topic_arn);
lines.push('\n');
lines.push_str("Type\n");
lines.push_str(&msg.message_type);
lines.push('\n');
}
_ => {}
}
lines
}
pub fn sign_outbound_payload(secret: &str, msg_id: &str, timestamp: i64, payload: &[u8]) -> String {
use base64::Engine;
let key_bytes = if let Some(stripped) = secret.strip_prefix("whsec_") {
base64::engine::general_purpose::STANDARD
.decode(stripped)
.unwrap_or_else(|_| secret.as_bytes().to_vec())
} else {
secret.as_bytes().to_vec()
};
let mut mac = HmacSha256::new_from_slice(&key_bytes).expect("HMAC accepts any key length");
mac.update(msg_id.as_bytes());
mac.update(b".");
mac.update(timestamp.to_string().as_bytes());
mac.update(b".");
mac.update(payload);
base64::engine::general_purpose::STANDARD.encode(mac.finalize().into_bytes())
}
pub fn generate_signing_secret() -> String {
use base64::Engine;
use sha2::Digest;
let ulid1 = ulid::Ulid::new();
let ulid2 = ulid::Ulid::new();
let mut hasher = Sha256::new();
hasher.update(ulid1.to_bytes());
hasher.update(ulid2.to_bytes());
let hash = hasher.finalize();
format!(
"whsec_{}",
base64::engine::general_purpose::STANDARD.encode(hash)
)
}
#[cfg(test)]
mod tests {
use super::*;
use axum::http::HeaderMap;
#[test]
fn test_github_signature_valid() {
let secret = "mysecret";
let payload = b"hello world";
let expected = compute_hmac_sha256_hex(secret.as_bytes(), payload);
let mut headers = HeaderMap::new();
headers.insert(
"X-Hub-Signature-256",
format!("sha256={expected}").parse().unwrap(),
);
assert!(verify_github(secret, payload, &headers).unwrap());
}
#[test]
fn test_github_signature_invalid() {
let mut headers = HeaderMap::new();
headers.insert(
"X-Hub-Signature-256",
"sha256=0000000000000000000000000000000000000000000000000000000000000000"
.parse()
.unwrap(),
);
assert!(!verify_github("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_github_signature_missing() {
let headers = HeaderMap::new();
assert!(!verify_github("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_stripe_signature_valid() {
let secret = "whsec_test";
let payload = b"{\"id\":\"evt_123\"}";
let timestamp = chrono::Utc::now().timestamp().to_string();
let signed = format!("{timestamp}.{}", String::from_utf8_lossy(payload));
let sig = compute_hmac_sha256_hex(secret.as_bytes(), signed.as_bytes());
let mut headers = HeaderMap::new();
headers.insert(
"Stripe-Signature",
format!("t={timestamp},v1={sig}").parse().unwrap(),
);
assert!(verify_stripe(secret, payload, &headers).unwrap());
}
#[test]
fn test_stripe_signature_expired() {
let secret = "whsec_test";
let payload = b"{\"id\":\"evt_123\"}";
let timestamp = (chrono::Utc::now().timestamp() - 600).to_string();
let signed = format!("{timestamp}.{}", String::from_utf8_lossy(payload));
let sig = compute_hmac_sha256_hex(secret.as_bytes(), signed.as_bytes());
let mut headers = HeaderMap::new();
headers.insert(
"Stripe-Signature",
format!("t={timestamp},v1={sig}").parse().unwrap(),
);
assert!(!verify_stripe(secret, payload, &headers).unwrap());
}
#[test]
fn test_stripe_signature_future() {
let secret = "whsec_test";
let payload = b"{\"id\":\"evt_123\"}";
let timestamp = (chrono::Utc::now().timestamp() + 600).to_string();
let signed = format!("{timestamp}.{}", String::from_utf8_lossy(payload));
let sig = compute_hmac_sha256_hex(secret.as_bytes(), signed.as_bytes());
let mut headers = HeaderMap::new();
headers.insert(
"Stripe-Signature",
format!("t={timestamp},v1={sig}").parse().unwrap(),
);
assert!(!verify_stripe(secret, payload, &headers).unwrap());
}
#[test]
fn test_shopify_signature_valid() {
let secret = "shopify_secret";
let payload = b"{\"topic\":\"orders/create\"}";
let expected = compute_hmac_sha256_base64(secret.as_bytes(), payload);
let mut headers = HeaderMap::new();
headers.insert("X-Shopify-Hmac-SHA256", expected.parse().unwrap());
assert!(verify_shopify(secret, payload, &headers).unwrap());
}
#[test]
fn test_custom_hmac_valid() {
let secret = "my_secret";
let payload = b"data";
let expected = compute_hmac_sha256_hex(secret.as_bytes(), payload);
let mut headers = HeaderMap::new();
headers.insert("X-Webhook-Signature", expected.parse().unwrap());
assert!(verify_custom_hmac(secret, payload, &headers).unwrap());
}
#[test]
fn test_verify_signature_unknown_provider() {
let headers = HeaderMap::new();
assert!(verify_signature("unknown", "secret", b"data", &headers).is_err());
}
#[test]
fn test_constant_time_eq() {
assert!(constant_time_eq("abc", "abc"));
assert!(!constant_time_eq("abc", "abd"));
assert!(!constant_time_eq("abc", "abcd"));
}
#[test]
fn test_sns_cert_url_valid() {
assert!(is_valid_sns_url(
"https://sns.us-east-1.amazonaws.com/SimpleNotificationService-abc123.pem"
));
assert!(is_valid_sns_url(
"https://sns.ap-northeast-1.amazonaws.com/cert.pem"
));
}
#[test]
fn test_sns_cert_url_invalid() {
assert!(!is_valid_sns_url(
"http://sns.us-east-1.amazonaws.com/cert.pem"
)); assert!(!is_valid_sns_url("https://evil.com/cert.pem")); assert!(!is_valid_sns_url("https://sns.us-east-1.evil.com/cert.pem")); assert!(!is_valid_sns_url("https://amazonaws.com/cert.pem")); assert!(!is_valid_sns_url(
"https://sns.us-east-1.amazonaws.com.attacker.com/cert.pem"
)); }
#[test]
fn test_sns_notification_string_to_sign() {
let msg = SnsMessage {
message_type: "Notification".into(),
message_id: "msg-123".into(),
message: "Hello".into(),
timestamp: "2024-01-01T00:00:00.000Z".into(),
topic_arn: "arn:aws:sns:us-east-1:123456:my-topic".into(),
signature: String::new(),
signing_cert_url: String::new(),
signature_version: "1".into(),
subject: None,
subscribe_url: None,
token: None,
unsubscribe_url: None,
};
let result = build_sns_string_to_sign(&msg);
assert_eq!(
result,
"Message\nHello\nMessageId\nmsg-123\nTimestamp\n2024-01-01T00:00:00.000Z\nTopicArn\narn:aws:sns:us-east-1:123456:my-topic\nType\nNotification\n"
);
}
#[test]
fn test_sns_notification_with_subject() {
let msg = SnsMessage {
message_type: "Notification".into(),
message_id: "msg-456".into(),
message: "Body".into(),
timestamp: "2024-01-01T00:00:00.000Z".into(),
topic_arn: "arn:aws:sns:us-east-1:123456:topic".into(),
signature: String::new(),
signing_cert_url: String::new(),
signature_version: "1".into(),
subject: Some("My Subject".into()),
subscribe_url: None,
token: None,
unsubscribe_url: None,
};
let result = build_sns_string_to_sign(&msg);
assert!(result.contains("Subject\nMy Subject\n"));
}
#[test]
fn test_sns_subscription_confirmation_string_to_sign() {
let msg = SnsMessage {
message_type: "SubscriptionConfirmation".into(),
message_id: "msg-789".into(),
message: "You have chosen to subscribe".into(),
timestamp: "2024-01-01T00:00:00.000Z".into(),
topic_arn: "arn:aws:sns:us-east-1:123456:topic".into(),
signature: String::new(),
signing_cert_url: String::new(),
signature_version: "1".into(),
subject: None,
subscribe_url: Some(
"https://sns.us-east-1.amazonaws.com/?Action=ConfirmSubscription".into(),
),
token: Some("token-abc".into()),
unsubscribe_url: None,
};
let result = build_sns_string_to_sign(&msg);
assert!(result.contains(
"SubscribeURL\nhttps://sns.us-east-1.amazonaws.com/?Action=ConfirmSubscription\n"
));
assert!(result.contains("Token\ntoken-abc\n"));
assert!(result.contains("Type\nSubscriptionConfirmation\n"));
}
#[test]
fn test_sns_message_deserialization() {
let json = r#"{
"Type": "Notification",
"MessageId": "id-123",
"TopicArn": "arn:aws:sns:us-east-1:123:topic",
"Subject": "test",
"Message": "{\"key\":\"value\"}",
"Timestamp": "2024-01-01T00:00:00.000Z",
"SignatureVersion": "1",
"Signature": "base64sig==",
"SigningCertURL": "https://sns.us-east-1.amazonaws.com/cert.pem"
}"#;
let msg: SnsMessage = serde_json::from_str(json).unwrap();
assert_eq!(msg.message_type, "Notification");
assert_eq!(msg.message_id, "id-123");
assert_eq!(msg.subject, Some("test".into()));
assert_eq!(msg.message, "{\"key\":\"value\"}");
}
#[test]
fn test_sns_subscribe_url_valid() {
assert!(is_valid_sns_url(
"https://sns.us-east-1.amazonaws.com/?Action=ConfirmSubscription&TopicArn=arn:aws:sns:us-east-1:123456:test&Token=abc"
));
}
#[test]
fn test_sns_subscribe_url_ssrf_blocked() {
assert!(!is_valid_sns_url(
"http://169.254.169.254/latest/meta-data/"
));
assert!(!is_valid_sns_url("https://evil.com/steal-data"));
assert!(!is_valid_sns_url(
"http://sns.us-east-1.amazonaws.com/?Action=Confirm"
));
}
#[test]
fn test_pagerduty_signature_valid() {
let secret = "pd_secret_key";
let payload = b"{\"event\":{\"event_type\":\"incident.triggered\"}}";
let expected = compute_hmac_sha256_hex(secret.as_bytes(), payload);
let mut headers = HeaderMap::new();
headers.insert(
"X-PagerDuty-Signature",
format!("v1={expected}").parse().unwrap(),
);
assert!(verify_pagerduty(secret, payload, &headers).unwrap());
}
#[test]
fn test_pagerduty_signature_invalid() {
let mut headers = HeaderMap::new();
headers.insert(
"X-PagerDuty-Signature",
"v1=0000000000000000000000000000000000000000000000000000000000000000"
.parse()
.unwrap(),
);
assert!(!verify_pagerduty("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_pagerduty_signature_missing() {
let headers = HeaderMap::new();
assert!(!verify_pagerduty("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_pagerduty_signature_no_prefix() {
let secret = "pd_secret";
let payload = b"data";
let sig = compute_hmac_sha256_hex(secret.as_bytes(), payload);
let mut headers = HeaderMap::new();
headers.insert("X-PagerDuty-Signature", sig.parse().unwrap());
assert!(!verify_pagerduty(secret, payload, &headers).unwrap());
}
#[test]
fn test_grafana_signature_valid() {
let secret = "grafana_secret";
let payload = b"{\"status\":\"firing\",\"alerts\":[]}";
let expected = compute_hmac_sha256_hex(secret.as_bytes(), payload);
let mut headers = HeaderMap::new();
headers.insert("X-Grafana-Alerting-Signature", expected.parse().unwrap());
assert!(verify_grafana(secret, payload, &headers).unwrap());
}
#[test]
fn test_grafana_signature_invalid() {
let mut headers = HeaderMap::new();
headers.insert(
"X-Grafana-Alerting-Signature",
"0000000000000000000000000000000000000000000000000000000000000000"
.parse()
.unwrap(),
);
assert!(!verify_grafana("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_grafana_signature_missing() {
let headers = HeaderMap::new();
assert!(!verify_grafana("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_terraform_signature_valid() {
let secret = "tf_secret";
let payload = b"{\"payload_version\":1,\"notifications\":[]}";
let expected = compute_hmac_sha512_hex(secret.as_bytes(), payload);
let mut headers = HeaderMap::new();
headers.insert("X-TFE-Notification-Signature", expected.parse().unwrap());
assert!(verify_terraform(secret, payload, &headers).unwrap());
}
#[test]
fn test_terraform_signature_invalid() {
let mut headers = HeaderMap::new();
headers.insert(
"X-TFE-Notification-Signature",
"00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"
.parse()
.unwrap(),
);
assert!(!verify_terraform("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_terraform_signature_missing() {
let headers = HeaderMap::new();
assert!(!verify_terraform("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_gitlab_token_valid() {
let secret = "my_gitlab_token";
let payload = b"{\"object_kind\":\"push\"}";
let mut headers = HeaderMap::new();
headers.insert("X-Gitlab-Token", secret.parse().unwrap());
assert!(verify_gitlab(secret, payload, &headers).unwrap());
}
#[test]
fn test_gitlab_token_invalid() {
let mut headers = HeaderMap::new();
headers.insert("X-Gitlab-Token", "wrong_token".parse().unwrap());
assert!(!verify_gitlab("correct_token", b"payload", &headers).unwrap());
}
#[test]
fn test_gitlab_token_missing() {
let headers = HeaderMap::new();
assert!(!verify_gitlab("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_verify_signature_pagerduty() {
let secret = "pd_key";
let payload = b"test";
let sig = compute_hmac_sha256_hex(secret.as_bytes(), payload);
let mut headers = HeaderMap::new();
headers.insert(
"X-PagerDuty-Signature",
format!("v1={sig}").parse().unwrap(),
);
assert!(verify_signature("pagerduty", secret, payload, &headers).unwrap());
}
#[test]
fn test_verify_signature_grafana() {
let secret = "gf_key";
let payload = b"test";
let sig = compute_hmac_sha256_hex(secret.as_bytes(), payload);
let mut headers = HeaderMap::new();
headers.insert("X-Grafana-Alerting-Signature", sig.parse().unwrap());
assert!(verify_signature("grafana", secret, payload, &headers).unwrap());
}
#[test]
fn test_verify_signature_terraform() {
let secret = "tf_key";
let payload = b"test";
let sig = compute_hmac_sha512_hex(secret.as_bytes(), payload);
let mut headers = HeaderMap::new();
headers.insert("X-TFE-Notification-Signature", sig.parse().unwrap());
assert!(verify_signature("terraform", secret, payload, &headers).unwrap());
}
#[test]
fn test_verify_signature_gitlab() {
let secret = "gl_token";
let mut headers = HeaderMap::new();
headers.insert("X-Gitlab-Token", secret.parse().unwrap());
assert!(verify_signature("gitlab", secret, b"any", &headers).unwrap());
}
#[test]
fn test_shopify_signature_invalid() {
let mut headers = HeaderMap::new();
headers.insert("X-Shopify-Hmac-SHA256", "bad-signature".parse().unwrap());
assert!(!verify_shopify("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_shopify_signature_missing() {
let headers = HeaderMap::new();
assert!(!verify_shopify("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_custom_hmac_invalid() {
let mut headers = HeaderMap::new();
headers.insert("X-Webhook-Signature", "wrong-hex".parse().unwrap());
assert!(!verify_custom_hmac("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_custom_hmac_missing() {
let headers = HeaderMap::new();
assert!(!verify_custom_hmac("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_stripe_signature_missing() {
let headers = HeaderMap::new();
assert!(!verify_stripe("whsec_test", b"payload", &headers).unwrap());
}
#[test]
fn test_cert_cache() {
let url = "https://sns.us-east-1.amazonaws.com/test-cache.pem";
assert!(get_cached_cert(url).is_none());
set_cached_cert(url, b"PEM DATA".to_vec());
let cached = get_cached_cert(url);
assert!(cached.is_some());
assert_eq!(cached.unwrap(), b"PEM DATA");
}
#[test]
fn test_standard_webhooks_valid() {
use base64::Engine;
let raw_key = b"test_standard_webhooks_key_12345";
let secret = format!(
"whsec_{}",
base64::engine::general_purpose::STANDARD.encode(raw_key)
);
let payload = b"{\"type\":\"user.created\",\"data\":{\"id\":\"usr_123\"}}";
let msg_id = "msg_01JEXAMPLE";
let timestamp = chrono::Utc::now().timestamp();
let mut mac = HmacSha256::new_from_slice(raw_key).unwrap();
mac.update(msg_id.as_bytes());
mac.update(b".");
mac.update(timestamp.to_string().as_bytes());
mac.update(b".");
mac.update(payload);
let sig = base64::engine::general_purpose::STANDARD.encode(mac.finalize().into_bytes());
let mut headers = HeaderMap::new();
headers.insert("webhook-id", msg_id.parse().unwrap());
headers.insert("webhook-timestamp", timestamp.to_string().parse().unwrap());
headers.insert("webhook-signature", format!("v1,{sig}").parse().unwrap());
assert!(verify_standard_webhooks(&secret, payload, &headers).unwrap());
}
#[test]
fn test_standard_webhooks_invalid_signature() {
let mut headers = HeaderMap::new();
headers.insert("webhook-id", "msg_test".parse().unwrap());
headers.insert(
"webhook-timestamp",
chrono::Utc::now().timestamp().to_string().parse().unwrap(),
);
headers.insert(
"webhook-signature",
"v1,aW52YWxpZHNpZ25hdHVyZQ==".parse().unwrap(),
);
assert!(!verify_standard_webhooks("whsec_dGVzdA==", b"payload", &headers).unwrap());
}
#[test]
fn test_standard_webhooks_missing_headers() {
let headers = HeaderMap::new();
assert!(!verify_standard_webhooks("whsec_dGVzdA==", b"payload", &headers).unwrap());
}
#[test]
fn test_standard_webhooks_expired_timestamp() {
use base64::Engine;
let raw_key = b"test_key_for_expired_timestamp_!";
let secret = format!(
"whsec_{}",
base64::engine::general_purpose::STANDARD.encode(raw_key)
);
let payload = b"{}";
let msg_id = "msg_expired";
let timestamp = chrono::Utc::now().timestamp() - 600;
let mut mac = HmacSha256::new_from_slice(raw_key).unwrap();
mac.update(msg_id.as_bytes());
mac.update(b".");
mac.update(timestamp.to_string().as_bytes());
mac.update(b".");
mac.update(payload);
let sig = base64::engine::general_purpose::STANDARD.encode(mac.finalize().into_bytes());
let mut headers = HeaderMap::new();
headers.insert("webhook-id", msg_id.parse().unwrap());
headers.insert("webhook-timestamp", timestamp.to_string().parse().unwrap());
headers.insert("webhook-signature", format!("v1,{sig}").parse().unwrap());
assert!(!verify_standard_webhooks(&secret, payload, &headers).unwrap());
}
#[test]
fn test_standard_webhooks_multiple_signatures() {
use base64::Engine;
let raw_key = b"test_multi_sig_key_0123456789ab";
let secret = format!(
"whsec_{}",
base64::engine::general_purpose::STANDARD.encode(raw_key)
);
let payload = b"{\"test\":true}";
let msg_id = "msg_multi";
let timestamp = chrono::Utc::now().timestamp();
let mut mac = HmacSha256::new_from_slice(raw_key).unwrap();
mac.update(msg_id.as_bytes());
mac.update(b".");
mac.update(timestamp.to_string().as_bytes());
mac.update(b".");
mac.update(payload);
let sig = base64::engine::general_purpose::STANDARD.encode(mac.finalize().into_bytes());
let mut headers = HeaderMap::new();
headers.insert("webhook-id", msg_id.parse().unwrap());
headers.insert("webhook-timestamp", timestamp.to_string().parse().unwrap());
headers.insert(
"webhook-signature",
format!("v1,invalidbase64== v1,{sig}").parse().unwrap(),
);
assert!(verify_standard_webhooks(&secret, payload, &headers).unwrap());
}
#[test]
fn test_verify_signature_dispatches_standard_webhooks() {
use base64::Engine;
let raw_key = b"dispatch_test_key_01234567890ab";
let secret = format!(
"whsec_{}",
base64::engine::general_purpose::STANDARD.encode(raw_key)
);
let payload = b"{}";
let msg_id = "msg_dispatch";
let timestamp = chrono::Utc::now().timestamp();
let mut mac = HmacSha256::new_from_slice(raw_key).unwrap();
mac.update(msg_id.as_bytes());
mac.update(b".");
mac.update(timestamp.to_string().as_bytes());
mac.update(b".");
mac.update(payload);
let sig = base64::engine::general_purpose::STANDARD.encode(mac.finalize().into_bytes());
let mut headers = HeaderMap::new();
headers.insert("webhook-id", msg_id.parse().unwrap());
headers.insert("webhook-timestamp", timestamp.to_string().parse().unwrap());
headers.insert("webhook-signature", format!("v1,{sig}").parse().unwrap());
assert!(verify_signature("standard-webhooks", &secret, payload, &headers).unwrap());
}
#[test]
fn test_linear_signature_valid() {
let secret = "linear_secret_key";
let payload = b"{\"action\":\"create\",\"type\":\"Issue\"}";
let expected = compute_hmac_sha256_hex(secret.as_bytes(), payload);
let mut headers = HeaderMap::new();
headers.insert("Linear-Signature", expected.parse().unwrap());
assert!(verify_linear(secret, payload, &headers).unwrap());
}
#[test]
fn test_linear_signature_invalid() {
let mut headers = HeaderMap::new();
headers.insert(
"Linear-Signature",
"0000000000000000000000000000000000000000000000000000000000000000"
.parse()
.unwrap(),
);
assert!(!verify_linear("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_linear_signature_missing() {
let headers = HeaderMap::new();
assert!(!verify_linear("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_verify_signature_dispatches_linear() {
let secret = "lin_key";
let payload = b"test";
let sig = compute_hmac_sha256_hex(secret.as_bytes(), payload);
let mut headers = HeaderMap::new();
headers.insert("Linear-Signature", sig.parse().unwrap());
assert!(verify_signature("linear", secret, payload, &headers).unwrap());
}
#[test]
fn test_twilio_signature_valid() {
use base64::Engine;
let secret = "twilio_auth_token";
let payload = b"{\"AccountSid\":\"AC123\",\"Body\":\"Hello\"}";
type HmacSha1 = Hmac<sha1::Sha1>;
let mut mac = HmacSha1::new_from_slice(secret.as_bytes()).unwrap();
mac.update(payload);
let expected =
base64::engine::general_purpose::STANDARD.encode(mac.finalize().into_bytes());
let mut headers = HeaderMap::new();
headers.insert("X-Twilio-Signature", expected.parse().unwrap());
assert!(verify_twilio(secret, payload, &headers).unwrap());
}
#[test]
fn test_twilio_signature_invalid() {
let mut headers = HeaderMap::new();
headers.insert(
"X-Twilio-Signature",
"aW52YWxpZHNpZ25hdHVyZQ==".parse().unwrap(),
);
assert!(!verify_twilio("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_twilio_signature_missing() {
let headers = HeaderMap::new();
assert!(!verify_twilio("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_verify_signature_dispatches_twilio() {
use base64::Engine;
let secret = "twilio_key";
let payload = b"test";
type HmacSha1 = Hmac<sha1::Sha1>;
let mut mac = HmacSha1::new_from_slice(secret.as_bytes()).unwrap();
mac.update(payload);
let sig = base64::engine::general_purpose::STANDARD.encode(mac.finalize().into_bytes());
let mut headers = HeaderMap::new();
headers.insert("X-Twilio-Signature", sig.parse().unwrap());
assert!(verify_signature("twilio", secret, payload, &headers).unwrap());
}
#[test]
fn test_paddle_signature_valid() {
let secret = "pdl_ntfset_abc123_secret";
let payload = b"{\"event_type\":\"subscription.created\",\"data\":{}}";
let timestamp = chrono::Utc::now().timestamp().to_string();
let signed = format!("{}:{}", timestamp, String::from_utf8_lossy(payload));
let expected = compute_hmac_sha256_hex(secret.as_bytes(), signed.as_bytes());
let mut headers = HeaderMap::new();
headers.insert(
"Paddle-Signature",
format!("ts={};h1={}", timestamp, expected).parse().unwrap(),
);
assert!(verify_paddle(secret, payload, &headers).unwrap());
}
#[test]
fn test_paddle_signature_invalid() {
let timestamp = chrono::Utc::now().timestamp().to_string();
let mut headers = HeaderMap::new();
headers.insert(
"Paddle-Signature",
format!(
"ts={};h1=0000000000000000000000000000000000000000000000000000000000000000",
timestamp
)
.parse()
.unwrap(),
);
assert!(!verify_paddle("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_paddle_signature_missing() {
let headers = HeaderMap::new();
assert!(!verify_paddle("secret", b"payload", &headers).unwrap());
}
#[test]
fn test_paddle_signature_expired() {
let secret = "pdl_secret";
let payload = b"{}";
let timestamp = (chrono::Utc::now().timestamp() - 600).to_string();
let signed = format!("{}:{}", timestamp, String::from_utf8_lossy(payload));
let expected = compute_hmac_sha256_hex(secret.as_bytes(), signed.as_bytes());
let mut headers = HeaderMap::new();
headers.insert(
"Paddle-Signature",
format!("ts={};h1={}", timestamp, expected).parse().unwrap(),
);
assert!(!verify_paddle(secret, payload, &headers).unwrap());
}
#[test]
fn test_paddle_signature_no_h1_prefix() {
let secret = "pdl_secret";
let payload = b"data";
let timestamp = chrono::Utc::now().timestamp().to_string();
let signed = format!("{}:{}", timestamp, String::from_utf8_lossy(payload));
let sig = compute_hmac_sha256_hex(secret.as_bytes(), signed.as_bytes());
let mut headers = HeaderMap::new();
headers.insert(
"Paddle-Signature",
format!("ts={};{}", timestamp, sig).parse().unwrap(),
);
assert!(!verify_paddle(secret, payload, &headers).unwrap());
}
#[test]
fn test_verify_signature_dispatches_paddle() {
let secret = "pdl_key";
let payload = b"test";
let timestamp = chrono::Utc::now().timestamp().to_string();
let signed = format!("{}:{}", timestamp, String::from_utf8_lossy(payload));
let sig = compute_hmac_sha256_hex(secret.as_bytes(), signed.as_bytes());
let mut headers = HeaderMap::new();
headers.insert(
"Paddle-Signature",
format!("ts={};h1={}", timestamp, sig).parse().unwrap(),
);
assert!(verify_signature("paddle", secret, payload, &headers).unwrap());
}
#[test]
fn test_sign_outbound_payload_deterministic() {
let secret = "whsec_dGVzdF9zZWNyZXRfa2V5X2Zvcl9zaWduaW5n";
let msg_id = "msg_001";
let timestamp = 1710000000i64;
let payload = b"{\"order_id\":\"123\"}";
let sig1 = sign_outbound_payload(secret, msg_id, timestamp, payload);
let sig2 = sign_outbound_payload(secret, msg_id, timestamp, payload);
assert_eq!(sig1, sig2, "Same inputs must produce same signature");
}
#[test]
fn test_sign_outbound_payload_different_secret() {
let msg_id = "msg_001";
let timestamp = 1710000000i64;
let payload = b"{\"order_id\":\"123\"}";
let sig1 = sign_outbound_payload("whsec_c2VjcmV0X2E=", msg_id, timestamp, payload);
let sig2 = sign_outbound_payload("whsec_c2VjcmV0X2I=", msg_id, timestamp, payload);
assert_ne!(
sig1, sig2,
"Different secrets must produce different signatures"
);
}
#[test]
fn test_sign_outbound_payload_different_timestamp() {
let secret = "whsec_dGVzdA==";
let msg_id = "msg_001";
let payload = b"{}";
let sig1 = sign_outbound_payload(secret, msg_id, 1000, payload);
let sig2 = sign_outbound_payload(secret, msg_id, 2000, payload);
assert_ne!(
sig1, sig2,
"Different timestamps must produce different signatures"
);
}
#[test]
fn test_sign_outbound_payload_base64_format() {
let sig = sign_outbound_payload("whsec_c2VjcmV0", "msg_001", 1710000000, b"test");
assert_eq!(sig.len(), 44);
assert!(
sig.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '+' || c == '/' || c == '=')
);
}
#[test]
fn test_sign_outbound_payload_verifiable() {
use base64::Engine;
let raw_key = b"verify_test_key_0123456789abcdef";
let secret = format!(
"whsec_{}",
base64::engine::general_purpose::STANDARD.encode(raw_key)
);
let msg_id = "msg_verify";
let timestamp = 1710000000i64;
let payload = b"{\"amount\":5000}";
let signature = sign_outbound_payload(&secret, msg_id, timestamp, payload);
let mut mac = HmacSha256::new_from_slice(raw_key).unwrap();
mac.update(b"msg_verify.1710000000.");
mac.update(payload);
let expected =
base64::engine::general_purpose::STANDARD.encode(mac.finalize().into_bytes());
assert_eq!(signature, expected);
}
#[test]
fn test_sign_outbound_msg_id_included() {
let secret = "whsec_dGVzdA==";
let payload = b"{}";
let sig1 = sign_outbound_payload(secret, "msg_001", 1000, payload);
let sig2 = sign_outbound_payload(secret, "msg_002", 1000, payload);
assert_ne!(
sig1, sig2,
"Different msg_ids must produce different signatures"
);
}
#[test]
fn test_generate_signing_secret_prefix() {
let secret = generate_signing_secret();
assert!(
secret.starts_with("whsec_"),
"Secret must start with whsec_ prefix, got: {}",
secret
);
}
#[test]
fn test_generate_signing_secret_unique() {
let s1 = generate_signing_secret();
let s2 = generate_signing_secret();
assert_ne!(s1, s2, "Each generated secret must be unique");
}
#[test]
fn test_generate_signing_secret_decodable() {
use base64::Engine;
let secret = generate_signing_secret();
let b64_part = secret.strip_prefix("whsec_").unwrap();
let decoded = base64::engine::general_purpose::STANDARD.decode(b64_part);
assert!(decoded.is_ok(), "Secret base64 part must be decodable");
assert_eq!(decoded.unwrap().len(), 32, "Decoded key must be 32 bytes");
}
}