use std::collections::HashMap;
use crate::auth::Session;
use crate::clients::GraphqlClient;
use crate::config::ShopifyConfig;
use super::errors::WebhookError;
use super::types::{
WebhookDeliveryMethod, WebhookHandler, WebhookRegistration, WebhookRegistrationResult,
WebhookTopic,
};
use super::verification::{verify_webhook, WebhookRequest};
#[derive(Default)]
pub struct WebhookRegistry {
registrations: HashMap<WebhookTopic, WebhookRegistration>,
handlers: HashMap<WebhookTopic, Box<dyn WebhookHandler>>,
}
impl std::fmt::Debug for WebhookRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WebhookRegistry")
.field("registrations", &self.registrations)
.field("handlers", &format!("<{} handlers>", self.handlers.len()))
.finish()
}
}
const _: fn() = || {
const fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<WebhookRegistry>();
};
impl WebhookRegistry {
#[must_use]
pub fn new() -> Self {
Self {
registrations: HashMap::new(),
handlers: HashMap::new(),
}
}
pub fn add_registration(&mut self, mut registration: WebhookRegistration) -> &mut Self {
let topic = registration.topic;
if let Some(handler) = registration.handler.take() {
self.handlers.insert(topic, handler);
}
self.registrations.insert(topic, registration);
self
}
#[must_use]
pub fn get_registration(&self, topic: &WebhookTopic) -> Option<&WebhookRegistration> {
self.registrations.get(topic)
}
#[must_use]
pub fn list_registrations(&self) -> Vec<&WebhookRegistration> {
self.registrations.values().collect()
}
pub async fn process(
&self,
config: &ShopifyConfig,
request: &WebhookRequest,
) -> Result<(), WebhookError> {
let context = verify_webhook(config, request)?;
let handler = match context.topic() {
Some(topic) => self.handlers.get(&topic),
None => None,
};
let handler = handler.ok_or_else(|| WebhookError::NoHandlerForTopic {
topic: context.topic_raw().to_string(),
})?;
let payload: serde_json::Value =
serde_json::from_slice(request.body()).map_err(|e| WebhookError::PayloadParseError {
message: e.to_string(),
})?;
handler.handle(context, payload).await
}
pub async fn register(
&self,
session: &Session,
config: &ShopifyConfig,
topic: &WebhookTopic,
) -> Result<WebhookRegistrationResult, WebhookError> {
let registration = self
.get_registration(topic)
.ok_or_else(|| WebhookError::RegistrationNotFound {
topic: topic.clone(),
})?;
let graphql_topic = topic_to_graphql_format(topic);
let client = GraphqlClient::new(session, Some(config));
let existing = self
.query_existing_subscription(&client, &graphql_topic, ®istration.delivery_method)
.await?;
match existing {
Some((id, existing_config)) => {
if self.config_matches(&existing_config, registration) {
Ok(WebhookRegistrationResult::AlreadyRegistered { id })
} else {
self.update_subscription(&client, &id, registration).await
}
}
None => {
self.create_subscription(&client, &graphql_topic, registration)
.await
}
}
}
pub async fn register_all(
&self,
session: &Session,
config: &ShopifyConfig,
) -> Vec<WebhookRegistrationResult> {
let mut results = Vec::new();
for registration in self.registrations.values() {
let result = match self.register(session, config, ®istration.topic).await {
Ok(result) => result,
Err(error) => WebhookRegistrationResult::Failed(error),
};
results.push(result);
}
results
}
pub async fn unregister(
&self,
session: &Session,
config: &ShopifyConfig,
topic: &WebhookTopic,
) -> Result<(), WebhookError> {
let registration = self
.get_registration(topic)
.ok_or_else(|| WebhookError::RegistrationNotFound {
topic: topic.clone(),
})?;
let graphql_topic = topic_to_graphql_format(topic);
let client = GraphqlClient::new(session, Some(config));
let existing = self
.query_existing_subscription(&client, &graphql_topic, ®istration.delivery_method)
.await?;
match existing {
Some((id, _)) => {
self.delete_subscription(&client, &id).await
}
None => Err(WebhookError::SubscriptionNotFound {
topic: topic.clone(),
}),
}
}
pub async fn unregister_all(
&self,
session: &Session,
config: &ShopifyConfig,
) -> Result<(), WebhookError> {
let mut first_error: Option<WebhookError> = None;
for registration in self.registrations.values() {
if let Err(error) = self.unregister(session, config, ®istration.topic).await {
if first_error.is_none() {
first_error = Some(error);
}
}
}
match first_error {
Some(error) => Err(error),
None => Ok(()),
}
}
async fn query_existing_subscription(
&self,
client: &GraphqlClient,
graphql_topic: &str,
delivery_method: &WebhookDeliveryMethod,
) -> Result<Option<(String, ExistingWebhookConfig)>, WebhookError> {
let query = format!(
r#"
query {{
webhookSubscriptions(first: 25, topics: [{topic}]) {{
edges {{
node {{
id
endpoint {{
... on WebhookHttpEndpoint {{
callbackUrl
}}
... on WebhookEventBridgeEndpoint {{
arn
}}
... on WebhookPubSubEndpoint {{
pubSubProject
pubSubTopic
}}
}}
includeFields
metafieldNamespaces
filter
}}
}}
}}
}}
"#,
topic = graphql_topic
);
let response = client.query(&query, None, None, None).await?;
let edges = response.body["data"]["webhookSubscriptions"]["edges"]
.as_array()
.ok_or_else(|| WebhookError::ShopifyError {
message: "Invalid response structure".to_string(),
})?;
if edges.is_empty() {
return Ok(None);
}
for edge in edges {
let node = &edge["node"];
let endpoint = &node["endpoint"];
let parsed_delivery_method = if let Some(uri) = endpoint["callbackUrl"].as_str() {
Some(WebhookDeliveryMethod::Http {
uri: uri.to_string(),
})
} else if let Some(arn) = endpoint["arn"].as_str() {
Some(WebhookDeliveryMethod::EventBridge {
arn: arn.to_string(),
})
} else if let (Some(project), Some(topic)) = (
endpoint["pubSubProject"].as_str(),
endpoint["pubSubTopic"].as_str(),
) {
Some(WebhookDeliveryMethod::PubSub {
project_id: project.to_string(),
topic_id: topic.to_string(),
})
} else {
None
};
if let Some(ref parsed_method) = parsed_delivery_method {
let type_matches = match (parsed_method, delivery_method) {
(WebhookDeliveryMethod::Http { .. }, WebhookDeliveryMethod::Http { .. }) => {
true
}
(
WebhookDeliveryMethod::EventBridge { .. },
WebhookDeliveryMethod::EventBridge { .. },
) => true,
(
WebhookDeliveryMethod::PubSub { .. },
WebhookDeliveryMethod::PubSub { .. },
) => true,
_ => false,
};
if type_matches {
let id = node["id"]
.as_str()
.ok_or_else(|| WebhookError::ShopifyError {
message: "Missing webhook ID".to_string(),
})?
.to_string();
let include_fields = node["includeFields"].as_array().map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
});
let metafield_namespaces = node["metafieldNamespaces"].as_array().map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
});
let filter = node["filter"].as_str().map(String::from);
return Ok(Some((
id,
ExistingWebhookConfig {
delivery_method: parsed_method.clone(),
include_fields,
metafield_namespaces,
filter,
},
)));
}
}
}
Ok(None)
}
fn config_matches(
&self,
existing: &ExistingWebhookConfig,
registration: &WebhookRegistration,
) -> bool {
existing.delivery_method == registration.delivery_method
&& existing.include_fields == registration.include_fields
&& existing.metafield_namespaces == registration.metafield_namespaces
&& existing.filter == registration.filter
}
async fn create_subscription(
&self,
client: &GraphqlClient,
graphql_topic: &str,
registration: &WebhookRegistration,
) -> Result<WebhookRegistrationResult, WebhookError> {
let delivery_input = build_delivery_input(®istration.delivery_method);
let include_fields_input = registration
.include_fields
.as_ref()
.map(|fields| {
let quoted: Vec<String> = fields.iter().map(|f| format!("\"{}\"", f)).collect();
format!(", includeFields: [{}]", quoted.join(", "))
})
.unwrap_or_default();
let metafield_namespaces_input = registration
.metafield_namespaces
.as_ref()
.map(|ns| {
let quoted: Vec<String> = ns.iter().map(|n| format!("\"{}\"", n)).collect();
format!(", metafieldNamespaces: [{}]", quoted.join(", "))
})
.unwrap_or_default();
let filter_input = registration
.filter
.as_ref()
.map(|f| format!(", filter: \"{}\"", f))
.unwrap_or_default();
let mutation = format!(
r#"
mutation {{
webhookSubscriptionCreate(
topic: {topic},
webhookSubscription: {{
{delivery}{include_fields}{metafield_namespaces}{filter}
}}
) {{
webhookSubscription {{
id
}}
userErrors {{
field
message
}}
}}
}}
"#,
topic = graphql_topic,
delivery = delivery_input,
include_fields = include_fields_input,
metafield_namespaces = metafield_namespaces_input,
filter = filter_input
);
let response = client.query(&mutation, None, None, None).await?;
let user_errors = &response.body["data"]["webhookSubscriptionCreate"]["userErrors"];
if let Some(errors) = user_errors.as_array() {
if !errors.is_empty() {
let messages: Vec<String> = errors
.iter()
.filter_map(|e| e["message"].as_str().map(String::from))
.collect();
return Err(WebhookError::ShopifyError {
message: messages.join("; "),
});
}
}
let id = response.body["data"]["webhookSubscriptionCreate"]["webhookSubscription"]["id"]
.as_str()
.ok_or_else(|| WebhookError::ShopifyError {
message: "Missing webhook subscription ID in response".to_string(),
})?
.to_string();
Ok(WebhookRegistrationResult::Created { id })
}
async fn update_subscription(
&self,
client: &GraphqlClient,
id: &str,
registration: &WebhookRegistration,
) -> Result<WebhookRegistrationResult, WebhookError> {
let delivery_input = build_delivery_input(®istration.delivery_method);
let include_fields_input = registration
.include_fields
.as_ref()
.map(|fields| {
let quoted: Vec<String> = fields.iter().map(|f| format!("\"{}\"", f)).collect();
format!(", includeFields: [{}]", quoted.join(", "))
})
.unwrap_or_default();
let metafield_namespaces_input = registration
.metafield_namespaces
.as_ref()
.map(|ns| {
let quoted: Vec<String> = ns.iter().map(|n| format!("\"{}\"", n)).collect();
format!(", metafieldNamespaces: [{}]", quoted.join(", "))
})
.unwrap_or_default();
let filter_input = registration
.filter
.as_ref()
.map(|f| format!(", filter: \"{}\"", f))
.unwrap_or_default();
let mutation = format!(
r#"
mutation {{
webhookSubscriptionUpdate(
id: "{id}",
webhookSubscription: {{
{delivery}{include_fields}{metafield_namespaces}{filter}
}}
) {{
webhookSubscription {{
id
}}
userErrors {{
field
message
}}
}}
}}
"#,
id = id,
delivery = delivery_input,
include_fields = include_fields_input,
metafield_namespaces = metafield_namespaces_input,
filter = filter_input
);
let response = client.query(&mutation, None, None, None).await?;
let user_errors = &response.body["data"]["webhookSubscriptionUpdate"]["userErrors"];
if let Some(errors) = user_errors.as_array() {
if !errors.is_empty() {
let messages: Vec<String> = errors
.iter()
.filter_map(|e| e["message"].as_str().map(String::from))
.collect();
return Err(WebhookError::ShopifyError {
message: messages.join("; "),
});
}
}
Ok(WebhookRegistrationResult::Updated { id: id.to_string() })
}
async fn delete_subscription(
&self,
client: &GraphqlClient,
id: &str,
) -> Result<(), WebhookError> {
let mutation = format!(
r#"
mutation {{
webhookSubscriptionDelete(id: "{id}") {{
deletedWebhookSubscriptionId
userErrors {{
field
message
}}
}}
}}
"#,
id = id
);
let response = client.query(&mutation, None, None, None).await?;
let user_errors = &response.body["data"]["webhookSubscriptionDelete"]["userErrors"];
if let Some(errors) = user_errors.as_array() {
if !errors.is_empty() {
let messages: Vec<String> = errors
.iter()
.filter_map(|e| e["message"].as_str().map(String::from))
.collect();
return Err(WebhookError::ShopifyError {
message: messages.join("; "),
});
}
}
Ok(())
}
}
#[derive(Debug, Clone)]
struct ExistingWebhookConfig {
delivery_method: WebhookDeliveryMethod,
include_fields: Option<Vec<String>>,
metafield_namespaces: Option<Vec<String>>,
filter: Option<String>,
}
fn build_delivery_input(delivery_method: &WebhookDeliveryMethod) -> String {
match delivery_method {
WebhookDeliveryMethod::Http { uri } => {
format!("uri: \"{}\"", uri)
}
WebhookDeliveryMethod::EventBridge { arn } => {
format!("uri: \"{}\"", arn)
}
WebhookDeliveryMethod::PubSub {
project_id,
topic_id,
} => {
format!("uri: \"pubsub://{}:{}\"", project_id, topic_id)
}
}
}
fn topic_to_graphql_format(topic: &WebhookTopic) -> String {
let json_str = serde_json::to_string(topic).unwrap_or_default();
json_str
.trim_matches('"')
.replace('/', "_")
.to_uppercase()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::auth::oauth::hmac::compute_signature_base64;
use crate::config::{ApiKey, ApiSecretKey};
use crate::webhooks::types::BoxFuture;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
struct TestHandler {
invoked: Arc<AtomicBool>,
}
impl WebhookHandler for TestHandler {
fn handle<'a>(
&'a self,
_context: super::super::verification::WebhookContext,
_payload: serde_json::Value,
) -> BoxFuture<'a, Result<(), WebhookError>> {
let invoked = self.invoked.clone();
Box::pin(async move {
invoked.store(true, Ordering::SeqCst);
Ok(())
})
}
}
struct ErrorHandler {
error_message: String,
}
impl WebhookHandler for ErrorHandler {
fn handle<'a>(
&'a self,
_context: super::super::verification::WebhookContext,
_payload: serde_json::Value,
) -> BoxFuture<'a, Result<(), WebhookError>> {
let message = self.error_message.clone();
Box::pin(async move { Err(WebhookError::ShopifyError { message }) })
}
}
#[test]
fn test_existing_config_with_http_delivery() {
let config = ExistingWebhookConfig {
delivery_method: WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks".to_string(),
},
include_fields: Some(vec!["id".to_string()]),
metafield_namespaces: None,
filter: None,
};
assert!(matches!(
config.delivery_method,
WebhookDeliveryMethod::Http { .. }
));
}
#[test]
fn test_existing_config_with_eventbridge_delivery() {
let config = ExistingWebhookConfig {
delivery_method: WebhookDeliveryMethod::EventBridge {
arn: "arn:aws:events:us-east-1::event-source/test".to_string(),
},
include_fields: None,
metafield_namespaces: None,
filter: Some("status:active".to_string()),
};
assert!(matches!(
config.delivery_method,
WebhookDeliveryMethod::EventBridge { .. }
));
assert!(config.filter.is_some());
}
#[test]
fn test_existing_config_with_pubsub_delivery() {
let config = ExistingWebhookConfig {
delivery_method: WebhookDeliveryMethod::PubSub {
project_id: "my-project".to_string(),
topic_id: "my-topic".to_string(),
},
include_fields: None,
metafield_namespaces: Some(vec!["custom".to_string()]),
filter: None,
};
match config.delivery_method {
WebhookDeliveryMethod::PubSub {
project_id,
topic_id,
} => {
assert_eq!(project_id, "my-project");
assert_eq!(topic_id, "my-topic");
}
_ => panic!("Expected PubSub delivery method"),
}
}
#[test]
fn test_build_delivery_input_http() {
let method = WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks".to_string(),
};
let input = build_delivery_input(&method);
assert_eq!(input, "uri: \"https://example.com/webhooks\"");
}
#[test]
fn test_build_delivery_input_eventbridge() {
let method = WebhookDeliveryMethod::EventBridge {
arn: "arn:aws:events:us-east-1::event-source/test".to_string(),
};
let input = build_delivery_input(&method);
assert_eq!(
input,
"uri: \"arn:aws:events:us-east-1::event-source/test\""
);
}
#[test]
fn test_build_delivery_input_pubsub() {
let method = WebhookDeliveryMethod::PubSub {
project_id: "my-project".to_string(),
topic_id: "my-topic".to_string(),
};
let input = build_delivery_input(&method);
assert_eq!(input, "uri: \"pubsub://my-project:my-topic\"");
}
#[test]
fn test_config_matches_http_same_url() {
let registry = WebhookRegistry::new();
let existing = ExistingWebhookConfig {
delivery_method: WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks".to_string(),
},
include_fields: None,
metafield_namespaces: None,
filter: None,
};
let registration = WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks".to_string(),
},
)
.build();
assert!(registry.config_matches(&existing, ®istration));
}
#[test]
fn test_config_matches_http_different_url() {
let registry = WebhookRegistry::new();
let existing = ExistingWebhookConfig {
delivery_method: WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks".to_string(),
},
include_fields: None,
metafield_namespaces: None,
filter: None,
};
let registration = WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://different.com/webhooks".to_string(),
},
)
.build();
assert!(!registry.config_matches(&existing, ®istration));
}
#[test]
fn test_config_matches_eventbridge_same_arn() {
let registry = WebhookRegistry::new();
let existing = ExistingWebhookConfig {
delivery_method: WebhookDeliveryMethod::EventBridge {
arn: "arn:aws:events:us-east-1::event-source/test".to_string(),
},
include_fields: None,
metafield_namespaces: None,
filter: None,
};
let registration = WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::EventBridge {
arn: "arn:aws:events:us-east-1::event-source/test".to_string(),
},
)
.build();
assert!(registry.config_matches(&existing, ®istration));
}
#[test]
fn test_config_matches_pubsub_same_project_and_topic() {
let registry = WebhookRegistry::new();
let existing = ExistingWebhookConfig {
delivery_method: WebhookDeliveryMethod::PubSub {
project_id: "my-project".to_string(),
topic_id: "my-topic".to_string(),
},
include_fields: None,
metafield_namespaces: None,
filter: None,
};
let registration = WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::PubSub {
project_id: "my-project".to_string(),
topic_id: "my-topic".to_string(),
},
)
.build();
assert!(registry.config_matches(&existing, ®istration));
}
#[test]
fn test_config_matches_different_delivery_methods_never_match() {
let registry = WebhookRegistry::new();
let existing = ExistingWebhookConfig {
delivery_method: WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks".to_string(),
},
include_fields: None,
metafield_namespaces: None,
filter: None,
};
let registration = WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::EventBridge {
arn: "arn:aws:events:us-east-1::event-source/test".to_string(),
},
)
.build();
assert!(!registry.config_matches(&existing, ®istration));
}
#[test]
fn test_config_matches_includes_other_fields() {
let registry = WebhookRegistry::new();
let existing = ExistingWebhookConfig {
delivery_method: WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks".to_string(),
},
include_fields: Some(vec!["id".to_string()]),
metafield_namespaces: Some(vec!["custom".to_string()]),
filter: Some("status:active".to_string()),
};
let registration = WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks".to_string(),
},
)
.include_fields(vec!["id".to_string()])
.metafield_namespaces(vec!["custom".to_string()])
.filter("status:active".to_string())
.build();
assert!(registry.config_matches(&existing, ®istration));
let registration_different = WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks".to_string(),
},
)
.include_fields(vec!["id".to_string()])
.metafield_namespaces(vec!["custom".to_string()])
.filter("status:inactive".to_string())
.build();
assert!(!registry.config_matches(&existing, ®istration_different));
}
#[test]
fn test_registry_accepts_http_delivery() {
let mut registry = WebhookRegistry::new();
registry.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks".to_string(),
},
)
.build(),
);
let registration = registry.get_registration(&WebhookTopic::OrdersCreate).unwrap();
assert!(matches!(
registration.delivery_method,
WebhookDeliveryMethod::Http { .. }
));
}
#[test]
fn test_registry_accepts_eventbridge_delivery() {
let mut registry = WebhookRegistry::new();
registry.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::EventBridge {
arn: "arn:aws:events:us-east-1::event-source/test".to_string(),
},
)
.build(),
);
let registration = registry.get_registration(&WebhookTopic::OrdersCreate).unwrap();
assert!(matches!(
registration.delivery_method,
WebhookDeliveryMethod::EventBridge { .. }
));
}
#[test]
fn test_registry_accepts_pubsub_delivery() {
let mut registry = WebhookRegistry::new();
registry.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::PubSub {
project_id: "my-project".to_string(),
topic_id: "my-topic".to_string(),
},
)
.build(),
);
let registration = registry.get_registration(&WebhookTopic::OrdersCreate).unwrap();
assert!(matches!(
registration.delivery_method,
WebhookDeliveryMethod::PubSub { .. }
));
}
#[test]
fn test_registry_allows_mixed_delivery_methods() {
let mut registry = WebhookRegistry::new();
registry
.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks".to_string(),
},
)
.build(),
)
.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::ProductsUpdate,
WebhookDeliveryMethod::EventBridge {
arn: "arn:aws:events:us-east-1::event-source/test".to_string(),
},
)
.build(),
)
.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::CustomersCreate,
WebhookDeliveryMethod::PubSub {
project_id: "my-project".to_string(),
topic_id: "my-topic".to_string(),
},
)
.build(),
);
assert_eq!(registry.list_registrations().len(), 3);
assert!(matches!(
registry
.get_registration(&WebhookTopic::OrdersCreate)
.unwrap()
.delivery_method,
WebhookDeliveryMethod::Http { .. }
));
assert!(matches!(
registry
.get_registration(&WebhookTopic::ProductsUpdate)
.unwrap()
.delivery_method,
WebhookDeliveryMethod::EventBridge { .. }
));
assert!(matches!(
registry
.get_registration(&WebhookTopic::CustomersCreate)
.unwrap()
.delivery_method,
WebhookDeliveryMethod::PubSub { .. }
));
}
#[test]
fn test_webhook_registry_new_creates_empty_registry() {
let registry = WebhookRegistry::new();
assert!(registry.list_registrations().is_empty());
}
#[test]
fn test_add_registration_stores_registration() {
let mut registry = WebhookRegistry::new();
registry.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/orders".to_string(),
},
)
.build(),
);
assert_eq!(registry.list_registrations().len(), 1);
assert!(registry.get_registration(&WebhookTopic::OrdersCreate).is_some());
}
#[test]
fn test_add_registration_overwrites_same_topic() {
let mut registry = WebhookRegistry::new();
registry.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/v1/orders".to_string(),
},
)
.build(),
);
registry.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/v2/orders".to_string(),
},
)
.build(),
);
assert_eq!(registry.list_registrations().len(), 1);
let registration = registry.get_registration(&WebhookTopic::OrdersCreate).unwrap();
match ®istration.delivery_method {
WebhookDeliveryMethod::Http { uri } => {
assert_eq!(uri, "https://example.com/webhooks/v2/orders");
}
_ => panic!("Expected Http delivery method"),
}
}
#[test]
fn test_get_registration_returns_none_for_missing_topic() {
let registry = WebhookRegistry::new();
assert!(registry.get_registration(&WebhookTopic::OrdersCreate).is_none());
}
#[test]
fn test_list_registrations_returns_all() {
let mut registry = WebhookRegistry::new();
registry
.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/orders".to_string(),
},
)
.build(),
)
.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::ProductsCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/products".to_string(),
},
)
.build(),
)
.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::CustomersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/customers".to_string(),
},
)
.build(),
);
let registrations = registry.list_registrations();
assert_eq!(registrations.len(), 3);
}
#[test]
fn test_webhook_registry_is_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<WebhookRegistry>();
}
#[test]
fn test_topic_to_graphql_format_orders_create() {
let topic = WebhookTopic::OrdersCreate;
let graphql_format = topic_to_graphql_format(&topic);
assert_eq!(graphql_format, "ORDERS_CREATE");
}
#[test]
fn test_topic_to_graphql_format_products_update() {
let topic = WebhookTopic::ProductsUpdate;
let graphql_format = topic_to_graphql_format(&topic);
assert_eq!(graphql_format, "PRODUCTS_UPDATE");
}
#[test]
fn test_topic_to_graphql_format_app_uninstalled() {
let topic = WebhookTopic::AppUninstalled;
let graphql_format = topic_to_graphql_format(&topic);
assert_eq!(graphql_format, "APP_UNINSTALLED");
}
#[test]
fn test_topic_to_graphql_format_inventory_levels_update() {
let topic = WebhookTopic::InventoryLevelsUpdate;
let graphql_format = topic_to_graphql_format(&topic);
assert_eq!(graphql_format, "INVENTORY_LEVELS_UPDATE");
}
#[test]
fn test_add_registration_returns_mut_self_for_chaining() {
let mut registry = WebhookRegistry::new();
let chain_result = registry
.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/orders".to_string(),
},
)
.build(),
)
.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::ProductsCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/products".to_string(),
},
)
.build(),
);
assert_eq!(chain_result.list_registrations().len(), 2);
}
#[test]
fn test_add_registration_extracts_and_stores_handler_separately() {
let invoked = Arc::new(AtomicBool::new(false));
let handler = TestHandler {
invoked: invoked.clone(),
};
let mut registry = WebhookRegistry::new();
registry.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/orders".to_string(),
},
)
.handler(handler)
.build(),
);
assert!(registry.get_registration(&WebhookTopic::OrdersCreate).is_some());
assert!(registry.handlers.contains_key(&WebhookTopic::OrdersCreate));
}
#[test]
fn test_handler_lookup_by_topic_succeeds_for_registered_handler() {
let invoked = Arc::new(AtomicBool::new(false));
let handler = TestHandler {
invoked: invoked.clone(),
};
let mut registry = WebhookRegistry::new();
registry.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/orders".to_string(),
},
)
.handler(handler)
.build(),
);
let found_handler = registry.handlers.get(&WebhookTopic::OrdersCreate);
assert!(found_handler.is_some());
}
#[test]
fn test_handler_lookup_returns_none_for_topic_without_handler() {
let mut registry = WebhookRegistry::new();
registry.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/orders".to_string(),
},
)
.build(),
);
let found_handler = registry.handlers.get(&WebhookTopic::OrdersCreate);
assert!(found_handler.is_none());
}
#[tokio::test]
async fn test_process_returns_no_handler_for_topic_error() {
let mut registry = WebhookRegistry::new();
registry.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/orders".to_string(),
},
)
.build(),
);
let config = ShopifyConfig::builder()
.api_key(ApiKey::new("key").unwrap())
.api_secret_key(ApiSecretKey::new("secret").unwrap())
.build()
.unwrap();
let body = b"{}";
let hmac = compute_signature_base64(body, "secret");
let request = WebhookRequest::new(
body.to_vec(),
hmac,
Some("orders/create".to_string()),
Some("shop.myshopify.com".to_string()),
None,
None,
);
let result = registry.process(&config, &request).await;
assert!(result.is_err());
match result.unwrap_err() {
WebhookError::NoHandlerForTopic { topic } => {
assert_eq!(topic, "orders/create");
}
other => panic!("Expected NoHandlerForTopic, got: {:?}", other),
}
}
#[tokio::test]
async fn test_process_returns_payload_parse_error_for_invalid_json() {
let invoked = Arc::new(AtomicBool::new(false));
let handler = TestHandler {
invoked: invoked.clone(),
};
let mut registry = WebhookRegistry::new();
registry.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/orders".to_string(),
},
)
.handler(handler)
.build(),
);
let config = ShopifyConfig::builder()
.api_key(ApiKey::new("key").unwrap())
.api_secret_key(ApiSecretKey::new("secret").unwrap())
.build()
.unwrap();
let body = b"not valid json {{{";
let hmac = compute_signature_base64(body, "secret");
let request = WebhookRequest::new(
body.to_vec(),
hmac,
Some("orders/create".to_string()),
Some("shop.myshopify.com".to_string()),
None,
None,
);
let result = registry.process(&config, &request).await;
assert!(result.is_err());
match result.unwrap_err() {
WebhookError::PayloadParseError { message } => {
assert!(!message.is_empty());
}
other => panic!("Expected PayloadParseError, got: {:?}", other),
}
assert!(!invoked.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_process_invokes_handler_with_correct_context_and_payload() {
let invoked = Arc::new(AtomicBool::new(false));
let handler = TestHandler {
invoked: invoked.clone(),
};
let mut registry = WebhookRegistry::new();
registry.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/orders".to_string(),
},
)
.handler(handler)
.build(),
);
let config = ShopifyConfig::builder()
.api_key(ApiKey::new("key").unwrap())
.api_secret_key(ApiSecretKey::new("secret").unwrap())
.build()
.unwrap();
let body = br#"{"order_id": 123}"#;
let hmac = compute_signature_base64(body, "secret");
let request = WebhookRequest::new(
body.to_vec(),
hmac,
Some("orders/create".to_string()),
Some("shop.myshopify.com".to_string()),
None,
None,
);
let result = registry.process(&config, &request).await;
assert!(result.is_ok());
assert!(invoked.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_handler_error_propagation_through_process() {
let handler = ErrorHandler {
error_message: "Handler failed intentionally".to_string(),
};
let mut registry = WebhookRegistry::new();
registry.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/orders".to_string(),
},
)
.handler(handler)
.build(),
);
let config = ShopifyConfig::builder()
.api_key(ApiKey::new("key").unwrap())
.api_secret_key(ApiSecretKey::new("secret").unwrap())
.build()
.unwrap();
let body = br#"{"order_id": 123}"#;
let hmac = compute_signature_base64(body, "secret");
let request = WebhookRequest::new(
body.to_vec(),
hmac,
Some("orders/create".to_string()),
Some("shop.myshopify.com".to_string()),
None,
None,
);
let result = registry.process(&config, &request).await;
assert!(result.is_err());
match result.unwrap_err() {
WebhookError::ShopifyError { message } => {
assert_eq!(message, "Handler failed intentionally");
}
other => panic!("Expected ShopifyError, got: {:?}", other),
}
}
#[tokio::test]
async fn test_multiple_handlers_for_different_topics() {
let orders_invoked = Arc::new(AtomicBool::new(false));
let products_invoked = Arc::new(AtomicBool::new(false));
let orders_handler = TestHandler {
invoked: orders_invoked.clone(),
};
let products_handler = TestHandler {
invoked: products_invoked.clone(),
};
let mut registry = WebhookRegistry::new();
registry
.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/orders".to_string(),
},
)
.handler(orders_handler)
.build(),
)
.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::ProductsCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/products".to_string(),
},
)
.handler(products_handler)
.build(),
);
let config = ShopifyConfig::builder()
.api_key(ApiKey::new("key").unwrap())
.api_secret_key(ApiSecretKey::new("secret").unwrap())
.build()
.unwrap();
let orders_body = br#"{"order_id": 123}"#;
let orders_hmac = compute_signature_base64(orders_body, "secret");
let orders_request = WebhookRequest::new(
orders_body.to_vec(),
orders_hmac,
Some("orders/create".to_string()),
Some("shop.myshopify.com".to_string()),
None,
None,
);
let result = registry.process(&config, &orders_request).await;
assert!(result.is_ok());
assert!(orders_invoked.load(Ordering::SeqCst));
assert!(!products_invoked.load(Ordering::SeqCst));
let products_body = br#"{"product_id": 456}"#;
let products_hmac = compute_signature_base64(products_body, "secret");
let products_request = WebhookRequest::new(
products_body.to_vec(),
products_hmac,
Some("products/create".to_string()),
Some("shop.myshopify.com".to_string()),
None,
None,
);
let result = registry.process(&config, &products_request).await;
assert!(result.is_ok());
assert!(products_invoked.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_handler_replacement_when_re_registering_same_topic() {
let first_invoked = Arc::new(AtomicBool::new(false));
let second_invoked = Arc::new(AtomicBool::new(false));
let first_handler = TestHandler {
invoked: first_invoked.clone(),
};
let second_handler = TestHandler {
invoked: second_invoked.clone(),
};
let mut registry = WebhookRegistry::new();
registry.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/orders".to_string(),
},
)
.handler(first_handler)
.build(),
);
registry.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/orders/v2".to_string(),
},
)
.handler(second_handler)
.build(),
);
let config = ShopifyConfig::builder()
.api_key(ApiKey::new("key").unwrap())
.api_secret_key(ApiSecretKey::new("secret").unwrap())
.build()
.unwrap();
let body = br#"{"order_id": 123}"#;
let hmac = compute_signature_base64(body, "secret");
let request = WebhookRequest::new(
body.to_vec(),
hmac,
Some("orders/create".to_string()),
Some("shop.myshopify.com".to_string()),
None,
None,
);
let result = registry.process(&config, &request).await;
assert!(result.is_ok());
assert!(!first_invoked.load(Ordering::SeqCst));
assert!(second_invoked.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_process_returns_invalid_hmac_for_bad_signature() {
let invoked = Arc::new(AtomicBool::new(false));
let handler = TestHandler {
invoked: invoked.clone(),
};
let mut registry = WebhookRegistry::new();
registry.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/orders".to_string(),
},
)
.handler(handler)
.build(),
);
let config = ShopifyConfig::builder()
.api_key(ApiKey::new("key").unwrap())
.api_secret_key(ApiSecretKey::new("secret").unwrap())
.build()
.unwrap();
let body = br#"{"order_id": 123}"#;
let hmac = compute_signature_base64(body, "wrong-secret");
let request = WebhookRequest::new(
body.to_vec(),
hmac,
Some("orders/create".to_string()),
Some("shop.myshopify.com".to_string()),
None,
None,
);
let result = registry.process(&config, &request).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), WebhookError::InvalidHmac));
assert!(!invoked.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_process_handles_unknown_topic() {
let invoked = Arc::new(AtomicBool::new(false));
let handler = TestHandler {
invoked: invoked.clone(),
};
let mut registry = WebhookRegistry::new();
registry.add_registration(
WebhookRegistrationBuilder::new(
WebhookTopic::OrdersCreate,
WebhookDeliveryMethod::Http {
uri: "https://example.com/webhooks/orders".to_string(),
},
)
.handler(handler)
.build(),
);
let config = ShopifyConfig::builder()
.api_key(ApiKey::new("key").unwrap())
.api_secret_key(ApiSecretKey::new("secret").unwrap())
.build()
.unwrap();
let body = br#"{"data": "test"}"#;
let hmac = compute_signature_base64(body, "secret");
let request = WebhookRequest::new(
body.to_vec(),
hmac,
Some("custom/unknown_topic".to_string()),
Some("shop.myshopify.com".to_string()),
None,
None,
);
let result = registry.process(&config, &request).await;
assert!(result.is_err());
match result.unwrap_err() {
WebhookError::NoHandlerForTopic { topic } => {
assert_eq!(topic, "custom/unknown_topic");
}
other => panic!("Expected NoHandlerForTopic, got: {:?}", other),
}
assert!(!invoked.load(Ordering::SeqCst));
}
}