#![deny(missing_docs)]
#[cfg(all(not(feature = "producer"), not(feature = "consumer")))]
compile_error!("at least one of feature \"producer\" and feature \"consumer\" must be enabled");
use chrono::{DateTime, Utc};
#[cfg(feature = "producer")]
use lazy_regex::regex_captures;
#[cfg(feature = "producer")]
use reqwest::header::{AUTHORIZATION, InvalidHeaderValue};
#[cfg(feature = "producer")]
use reqwest::{Client, Url};
#[cfg(feature = "producer")]
use serde::{Deserialize, Serialize};
#[cfg(feature = "producer")]
use std::borrow::Cow;
#[cfg(feature = "producer")]
use std::collections::{HashMap, HashSet};
#[cfg(feature = "producer")]
use std::fmt::Display;
#[cfg(feature = "producer")]
use std::str::FromStr;
#[cfg(feature = "producer")]
use tracing::{debug, error, trace};
#[cfg(feature = "producer")]
use url::ParseError;
#[cfg(feature = "producer")]
use uuid::Uuid;
#[cfg(feature = "consumer")]
use chrono::{Duration, OutOfRangeError};
#[cfg(feature = "consumer")]
use std::time::Duration as StdDuration;
#[cfg(feature = "consumer")]
mod signature;
#[cfg(feature = "producer")]
#[derive(Debug, Clone)]
pub struct Hook0Client {
client: Client,
api_url: Url,
application_id: Uuid,
}
#[cfg(feature = "producer")]
impl Hook0Client {
pub fn new(api_url: Url, application_id: Uuid, token: &str) -> Result<Self, Hook0ClientError> {
let authenticated_client =
reqwest::header::HeaderValue::from_str(&format!("Bearer {token}"))
.map_err(|e| Hook0ClientError::AuthHeader(e).log_and_return())
.map(|hv| reqwest::header::HeaderMap::from_iter([(AUTHORIZATION, hv)]))
.and_then(|headers| {
Client::builder()
.default_headers(headers)
.build()
.map_err(|e| Hook0ClientError::ReqwestClient(e).log_and_return())
})?;
Ok(Self {
api_url,
client: authenticated_client,
application_id,
})
}
pub fn api_url(&self) -> &Url {
&self.api_url
}
pub fn application_id(&self) -> &Uuid {
&self.application_id
}
fn mk_url(&self, segments: &[&str]) -> Result<Url, Hook0ClientError> {
append_url_segments(&self.api_url, segments)
.map_err(|e| Hook0ClientError::Url(e).log_and_return())
}
pub async fn send_event(&self, event: &Event<'_>) -> Result<Uuid, Hook0ClientError> {
let event_ingestion_url = self.mk_url(&["event"])?;
let full_event = FullEvent::from_event(event, &self.application_id);
let res = self
.client
.post(event_ingestion_url)
.json(&full_event)
.send()
.await
.map_err(|e| {
Hook0ClientError::EventSending {
event_id: full_event.event_id.map(|id| id.to_owned()),
error: e,
body: None,
}
.log_and_return()
})?;
match res.error_for_status_ref() {
Ok(_) => {
#[derive(Debug, Deserialize)]
struct Response {
event_id: Uuid,
}
match res
.json::<Response>()
.await
.map(|response| response.event_id)
{
Ok(id) => Ok(id),
Err(e) => Err(Hook0ClientError::EventSending {
event_id: full_event.event_id.map(|id| id.to_owned()),
error: e,
body: None,
}
.log_and_return()),
}
}
Err(e) => {
let body = res.text().await.ok();
Err(Hook0ClientError::EventSending {
event_id: full_event.event_id.map(|id| id.to_owned()),
error: e,
body,
}
.log_and_return())
}
}
}
pub async fn upsert_event_types(
&self,
event_types: &[&str],
) -> Result<Vec<String>, Hook0ClientError> {
let structured_event_types = event_types
.iter()
.map(|str| {
EventType::from_str(str)
.map_err(|_| Hook0ClientError::InvalidEventType(str.to_string()))
})
.collect::<Result<Vec<EventType>, Hook0ClientError>>()?;
let event_types_url = self.mk_url(&["event_types"])?;
#[derive(Debug, Deserialize)]
struct ApiEventType {
event_type_name: String,
}
trace!("Getting the list of available event types");
let available_event_types_vec = self
.client
.get(event_types_url.as_str())
.query(&[("application_id", self.application_id())])
.send()
.await
.map_err(Hook0ClientError::GetAvailableEventTypes)?
.error_for_status()
.map_err(Hook0ClientError::GetAvailableEventTypes)?
.json::<Vec<ApiEventType>>()
.await
.map_err(Hook0ClientError::GetAvailableEventTypes)?;
let available_event_types = available_event_types_vec
.iter()
.map(|et| et.event_type_name.to_owned())
.collect::<HashSet<String>>();
debug!(
"There are currently {} event types",
available_event_types.len(),
);
#[derive(Debug, Serialize)]
struct ApiEventTypePost {
application_id: Uuid,
service: String,
resource_type: String,
verb: String,
}
impl Display for ApiEventTypePost {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}.{}", self.service, self.resource_type, self.verb)
}
}
let mut added_event_types = vec![];
for event_type in structured_event_types {
let event_type_str = event_type.to_string();
if !available_event_types.contains(&event_type_str) {
debug!("Creating the '{event_type}' event type");
let body = ApiEventTypePost {
application_id: self.application_id,
service: event_type.service,
resource_type: event_type.resource_type,
verb: event_type.verb,
};
self.client
.post(event_types_url.as_str())
.json(&body)
.send()
.await
.map_err(|e| Hook0ClientError::CreatingEventType {
event_type_name: body.to_string(),
error: e,
})?
.error_for_status()
.map_err(|e| Hook0ClientError::CreatingEventType {
event_type_name: body.to_string(),
error: e,
})?;
added_event_types.push(body.to_string());
}
}
debug!("{} new event types were created", added_event_types.len());
Ok(added_event_types)
}
}
#[cfg(feature = "consumer")]
pub fn verify_webhook_signature_with_current_time<
HeaderKey: AsRef<[u8]>,
HeaderValue: AsRef<[u8]>,
>(
signature: &str,
payload: &[u8],
headers: &[(HeaderKey, HeaderValue)],
subscription_secret: &str,
tolerance: StdDuration,
current_time: DateTime<Utc>,
) -> Result<(), Hook0ClientError> {
let parsed_sig =
signature::Signature::parse(signature).map_err(|_| Hook0ClientError::InvalidSignature)?;
let headers_with_parsed_name = headers
.iter()
.map(|(k, v)| {
let name = http::HeaderName::from_bytes(k.as_ref()).map_err(|error| {
Hook0ClientError::InvalidHeaderName {
header_name: String::from_utf8_lossy(k.as_ref()).into_owned(),
error,
}
});
name.map(|n| (n, v))
})
.collect::<Result<std::collections::HashMap<_, _>, _>>()?;
let headers_vec = parsed_sig
.h
.iter()
.map(|expected| {
headers_with_parsed_name
.get(expected)
.ok_or_else(|| Hook0ClientError::MissingHeader(expected.to_owned()))
.and_then(|v| {
String::from_utf8(v.as_ref().to_vec()).map_err(|error| {
Hook0ClientError::InvalidHeaderValue {
header_name: expected.to_owned(),
header_value: String::from_utf8_lossy(v.as_ref()).into_owned(),
error,
}
})
})
})
.collect::<Result<Vec<_>, _>>()?;
if !parsed_sig.verify(payload, &headers_vec, subscription_secret) {
Err(Hook0ClientError::InvalidSignature)
} else {
let signed_at = DateTime::from_timestamp(parsed_sig.timestamp, 0);
match signed_at {
Some(signed_at) => {
let tolerance = Duration::from_std(tolerance);
match tolerance {
Ok(tolerance) => {
if (current_time - signed_at) > tolerance {
Err(Hook0ClientError::ExpiredWebhook {
signed_at,
tolerance,
current_time,
})
} else {
Ok(())
}
}
Err(e) => Err(Hook0ClientError::InvalidTolerance(e)),
}
}
None => Err(Hook0ClientError::InvalidSignature),
}
}
}
#[cfg(feature = "consumer")]
pub fn verify_webhook_signature<HeaderKey: AsRef<[u8]>, HeaderValue: AsRef<[u8]>>(
signature: &str,
payload: &[u8],
headers: &[(HeaderKey, HeaderValue)],
subscription_secret: &str,
tolerance: StdDuration,
) -> Result<(), Hook0ClientError> {
verify_webhook_signature_with_current_time(
signature,
payload,
headers,
subscription_secret,
tolerance,
Utc::now(),
)
}
#[cfg(feature = "producer")]
#[derive(Debug, Serialize, PartialEq, Eq)]
struct EventType {
service: String,
resource_type: String,
verb: String,
}
#[cfg(feature = "producer")]
impl FromStr for EventType {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
let captures = regex_captures!("^([A-Z0-9_]+)[.]([A-Z0-9_]+)[.]([A-Z0-9_]+)$"i, s);
if let Some((_, service, resource_type, verb)) = captures {
Ok(Self {
resource_type: resource_type.to_owned(),
service: service.to_owned(),
verb: verb.to_owned(),
})
} else {
Err(())
}
}
}
#[cfg(feature = "producer")]
impl Display for EventType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}.{}", self.service, self.resource_type, self.verb)
}
}
#[cfg(feature = "producer")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Event<'a> {
pub event_id: Option<&'a Uuid>,
pub event_type: &'a str,
pub payload: Cow<'a, str>,
pub payload_content_type: &'a str,
pub metadata: Option<Vec<(String, String)>>,
pub occurred_at: Option<DateTime<Utc>>,
pub labels: Vec<(String, String)>,
}
#[cfg(feature = "producer")]
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
struct FullEvent<'a> {
pub application_id: Uuid,
#[serde(skip_serializing_if = "Option::is_none")]
pub event_id: Option<&'a Uuid>,
pub event_type: &'a str,
pub payload: &'a str,
pub payload_content_type: &'a str,
pub metadata: Option<HashMap<String, String>>,
pub occurred_at: DateTime<Utc>,
pub labels: HashMap<String, String>,
}
#[cfg(feature = "producer")]
impl<'a> FullEvent<'a> {
pub fn from_event(event: &'a Event, application_id: &Uuid) -> Self {
let occurred_at = event.occurred_at.unwrap_or_else(Utc::now);
Self {
application_id: application_id.to_owned(),
event_id: event.event_id,
event_type: event.event_type,
payload: event.payload.as_ref(),
payload_content_type: event.payload_content_type,
metadata: event
.metadata
.as_ref()
.map(|items| HashMap::from_iter(items.iter().cloned())),
occurred_at,
labels: HashMap::from_iter(event.labels.iter().cloned()),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum Hook0ClientError {
#[cfg(feature = "producer")]
#[error("Could not build auth header: {0}")]
AuthHeader(InvalidHeaderValue),
#[cfg(feature = "producer")]
#[error("Could not build reqwest HTTP client: {0}")]
ReqwestClient(reqwest::Error),
#[cfg(feature = "producer")]
#[error("Could not create a valid URL to request Hook0's API: {0}")]
Url(ParseError),
#[cfg(feature = "producer")]
#[error("Sending event{} failed: {error} [body={}]", event_id.map(|id| format!(" {id}")).unwrap_or_else(String::new), body.as_deref().unwrap_or(""))]
EventSending {
event_id: Option<Uuid>,
error: reqwest::Error,
body: Option<String>,
},
#[cfg(feature = "producer")]
#[error("Provided event type '{0}' does not have a valid syntax (service.resource_type.verb)")]
InvalidEventType(String),
#[cfg(feature = "producer")]
#[error("Getting available event types failed: {0}")]
GetAvailableEventTypes(reqwest::Error),
#[cfg(feature = "producer")]
#[error("Creating event type '{event_type_name}' failed: {error}")]
CreatingEventType {
event_type_name: String,
error: reqwest::Error,
},
#[cfg(feature = "consumer")]
#[error("Invalid signature")]
InvalidSignature,
#[cfg(feature = "consumer")]
#[error(
"The webhook has expired because it was sent too long ago (signed_at={signed_at}, tolerance={tolerance}, current_time={current_time})"
)]
ExpiredWebhook {
signed_at: DateTime<Utc>,
tolerance: Duration,
current_time: DateTime<Utc>,
},
#[cfg(feature = "consumer")]
#[error("Could not parse signature header: {0}")]
SignatureHeaderParsing(String),
#[cfg(feature = "consumer")]
#[error("Could not parse timestamp `{timestamp}` in signature: {error}")]
TimestampParsing {
timestamp: String,
error: std::num::ParseIntError,
},
#[cfg(feature = "consumer")]
#[error("Could not parse v0 signature `{signature}`: {error}")]
V0SignatureParsing {
signature: String,
error: hex::FromHexError,
},
#[cfg(feature = "consumer")]
#[error("Could not parse header name `{header}` in `h` field: {error}")]
HeaderNameParsing {
header: String,
error: http::header::InvalidHeaderName,
},
#[cfg(feature = "consumer")]
#[error("Could not parse v1 signature `{signature}`: {error}")]
V1SignatureParsing {
signature: String,
error: hex::FromHexError,
},
#[cfg(feature = "consumer")]
#[error("The `{0}` header present in the webhook's signature was not provided with a value")]
MissingHeader(http::HeaderName),
#[cfg(feature = "consumer")]
#[error("Provided `{header_name}` has an invalid header name: {error}")]
InvalidHeaderName {
header_name: String,
error: http::header::InvalidHeaderName,
},
#[cfg(feature = "consumer")]
#[error("Provided `{header_name}` has an invalid header value `{header_value}`: {error}")]
InvalidHeaderValue {
header_name: http::HeaderName,
header_value: String,
error: std::string::FromUtf8Error,
},
#[cfg(feature = "consumer")]
#[error("Invalid tolerance Duration: {0}")]
InvalidTolerance(OutOfRangeError),
}
#[cfg(feature = "producer")]
impl Hook0ClientError {
pub fn log_and_return(self) -> Self {
error!("{self}");
self
}
}
#[cfg(feature = "producer")]
fn append_url_segments(base_url: &Url, segments: &[&str]) -> Result<Url, url::ParseError> {
const SEP: &str = "/";
let segments_str = segments.join(SEP);
let url = Url::parse(&format!("{base_url}/{segments_str}").replace("//", "/"))?;
Ok(url)
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(feature = "producer")]
#[test]
fn displaying_event_type() {
let et = EventType {
service: "service".to_owned(),
resource_type: "resource".to_owned(),
verb: "verb".to_owned(),
};
assert_eq!(et.to_string(), "service.resource.verb")
}
#[cfg(feature = "producer")]
#[test]
fn parsing_valid_event_type() {
let et = EventType {
service: "service".to_owned(),
resource_type: "resource".to_owned(),
verb: "verb".to_owned(),
};
assert_eq!(EventType::from_str(&et.to_string()), Ok(et))
}
#[cfg(feature = "producer")]
#[test]
fn parsing_invalid_event_type() {
assert_eq!(EventType::from_str("test.test"), Err(()))
}
#[cfg(feature = "consumer")]
#[test]
fn verifying_valid_signature_v0() {
let signature =
"t=1636936200,v0=1b3d69df55f1e52f05224ba94a5162abeb17ef52cd7f4948c390f810d6a87e98";
let payload = "hello !".as_bytes();
let subscription_secret = "secret";
let tolerance = StdDuration::from_secs((i64::MAX / 1000) as u64);
assert!(
verify_webhook_signature::<&str, &str>(
signature,
payload,
&[],
subscription_secret,
tolerance
)
.is_ok()
);
}
#[cfg(feature = "consumer")]
#[test]
fn verifying_valid_signature_v0_with_current_time() {
let signature =
"t=1636936200,v0=1b3d69df55f1e52f05224ba94a5162abeb17ef52cd7f4948c390f810d6a87e98";
let payload = "hello !".as_bytes();
let subscription_secret = "secret";
let tolerance = StdDuration::from_secs((i64::MAX / 1000) as u64);
assert!(
verify_webhook_signature::<&str, &str>(
signature,
payload,
&[],
subscription_secret,
tolerance
)
.is_ok()
);
}
#[cfg(feature = "consumer")]
#[test]
fn verifying_expired_signature_v0() {
let signature =
"t=1636936200,v0=1b3d69df55f1e52f05224ba94a5162abeb17ef52cd7f4948c390f810d6a87e98";
let payload = "hello !".as_bytes();
let subscription_secret = "secret";
let tolerance = StdDuration::from_secs(300);
assert!(
verify_webhook_signature::<&str, &str>(
signature,
payload,
&[],
subscription_secret,
tolerance
)
.is_err()
);
}
#[cfg(feature = "consumer")]
#[test]
fn verifying_valid_signature_v1() {
let signature = "t=1636936200,h=x-test x-test2,v1=493c35f05443fdb74cb99fd4f00e0e7653c2ab6b24fbc97f4a7bd4d56b31758a";
let payload = "hello !".as_bytes();
let header_values = [("x-test", "val1"), ("x-test2", "val2")];
let subscription_secret = "secret";
let tolerance = StdDuration::from_secs((i64::MAX / 1000) as u64);
assert!(
verify_webhook_signature::<&str, &str>(
signature,
payload,
&header_values,
subscription_secret,
tolerance
)
.is_ok()
);
}
#[cfg(feature = "consumer")]
#[test]
fn verifying_valid_signature_v1_with_current_time() {
let signature = "t=1636936200,h=x-test x-test2,v1=493c35f05443fdb74cb99fd4f00e0e7653c2ab6b24fbc97f4a7bd4d56b31758a";
let payload = "hello !".as_bytes();
let header_values = [("x-test", "val1"), ("x-test2", "val2")];
let subscription_secret = "secret";
let tolerance = StdDuration::from_secs((i64::MAX / 1000) as u64);
assert!(
verify_webhook_signature::<&str, &str>(
signature,
payload,
&header_values,
subscription_secret,
tolerance
)
.is_ok()
);
}
}