1#![deny(missing_docs)]
3
4#[cfg(all(not(feature = "producer"), not(feature = "consumer")))]
8compile_error!("at least one of feature \"producer\" and feature \"consumer\" must be enabled");
9
10use chrono::{DateTime, Utc};
11
12#[cfg(feature = "producer")]
13use lazy_regex::regex_captures;
14#[cfg(feature = "producer")]
15use reqwest::header::{AUTHORIZATION, InvalidHeaderValue};
16#[cfg(feature = "producer")]
17use reqwest::{Client, Url};
18#[cfg(feature = "producer")]
19use serde::{Deserialize, Serialize};
20#[cfg(feature = "producer")]
21use std::borrow::Cow;
22#[cfg(feature = "producer")]
23use std::collections::{HashMap, HashSet};
24#[cfg(feature = "producer")]
25use std::fmt::Display;
26#[cfg(feature = "producer")]
27use std::str::FromStr;
28#[cfg(feature = "producer")]
29use tracing::{debug, error, trace};
30#[cfg(feature = "producer")]
31use url::ParseError;
32#[cfg(feature = "producer")]
33use uuid::Uuid;
34
35#[cfg(feature = "consumer")]
36use chrono::{Duration, OutOfRangeError};
37#[cfg(feature = "consumer")]
38use std::time::Duration as StdDuration;
39#[cfg(feature = "consumer")]
40mod signature;
41
42#[cfg(feature = "producer")]
43#[derive(Debug, Clone)]
47pub struct Hook0Client {
48 client: Client,
49 api_url: Url,
50 application_id: Uuid,
51}
52
53#[cfg(feature = "producer")]
54impl Hook0Client {
55 pub fn new(api_url: Url, application_id: Uuid, token: &str) -> Result<Self, Hook0ClientError> {
61 let authenticated_client =
62 reqwest::header::HeaderValue::from_str(&format!("Bearer {token}"))
63 .map_err(|e| Hook0ClientError::AuthHeader(e).log_and_return())
64 .map(|hv| reqwest::header::HeaderMap::from_iter([(AUTHORIZATION, hv)]))
65 .and_then(|headers| {
66 Client::builder()
67 .default_headers(headers)
68 .build()
69 .map_err(|e| Hook0ClientError::ReqwestClient(e).log_and_return())
70 })?;
71
72 Ok(Self {
73 api_url,
74 client: authenticated_client,
75 application_id,
76 })
77 }
78
79 pub fn api_url(&self) -> &Url {
81 &self.api_url
82 }
83
84 pub fn application_id(&self) -> &Uuid {
86 &self.application_id
87 }
88
89 fn mk_url(&self, segments: &[&str]) -> Result<Url, Hook0ClientError> {
90 append_url_segments(&self.api_url, segments)
91 .map_err(|e| Hook0ClientError::Url(e).log_and_return())
92 }
93
94 pub async fn send_event(&self, event: &Event<'_>) -> Result<Uuid, Hook0ClientError> {
96 let event_ingestion_url = self.mk_url(&["event"])?;
97 let full_event = FullEvent::from_event(event, &self.application_id);
98
99 let res = self
100 .client
101 .post(event_ingestion_url)
102 .json(&full_event)
103 .send()
104 .await
105 .map_err(|e| {
106 Hook0ClientError::EventSending {
107 event_id: full_event.event_id.map(|id| id.to_owned()),
108 error: e,
109 body: None,
110 }
111 .log_and_return()
112 })?;
113
114 match res.error_for_status_ref() {
115 Ok(_) => {
116 #[derive(Debug, Deserialize)]
117 struct Response {
118 event_id: Uuid,
119 }
120 match res
121 .json::<Response>()
122 .await
123 .map(|response| response.event_id)
124 {
125 Ok(id) => Ok(id),
126 Err(e) => Err(Hook0ClientError::EventSending {
127 event_id: full_event.event_id.map(|id| id.to_owned()),
128 error: e,
129 body: None,
130 }
131 .log_and_return()),
132 }
133 }
134 Err(e) => {
135 let body = res.text().await.ok();
136 Err(Hook0ClientError::EventSending {
137 event_id: full_event.event_id.map(|id| id.to_owned()),
138 error: e,
139 body,
140 }
141 .log_and_return())
142 }
143 }
144 }
145
146 pub async fn upsert_event_types(
150 &self,
151 event_types: &[&str],
152 ) -> Result<Vec<String>, Hook0ClientError> {
153 let structured_event_types = event_types
154 .iter()
155 .map(|str| {
156 EventType::from_str(str)
157 .map_err(|_| Hook0ClientError::InvalidEventType(str.to_string()))
158 })
159 .collect::<Result<Vec<EventType>, Hook0ClientError>>()?;
160
161 let event_types_url = self.mk_url(&["event_types"])?;
162 #[derive(Debug, Deserialize)]
163 struct ApiEventType {
164 event_type_name: String,
165 }
166
167 trace!("Getting the list of available event types");
168 let available_event_types_vec = self
169 .client
170 .get(event_types_url.as_str())
171 .query(&[("application_id", self.application_id())])
172 .send()
173 .await
174 .map_err(Hook0ClientError::GetAvailableEventTypes)?
175 .error_for_status()
176 .map_err(Hook0ClientError::GetAvailableEventTypes)?
177 .json::<Vec<ApiEventType>>()
178 .await
179 .map_err(Hook0ClientError::GetAvailableEventTypes)?;
180 let available_event_types = available_event_types_vec
181 .iter()
182 .map(|et| et.event_type_name.to_owned())
183 .collect::<HashSet<String>>();
184 debug!(
185 "There are currently {} event types",
186 available_event_types.len(),
187 );
188
189 #[derive(Debug, Serialize)]
190 struct ApiEventTypePost {
191 application_id: Uuid,
192 service: String,
193 resource_type: String,
194 verb: String,
195 }
196 impl Display for ApiEventTypePost {
197 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198 write!(f, "{}.{}.{}", self.service, self.resource_type, self.verb)
199 }
200 }
201
202 let mut added_event_types = vec![];
203 for event_type in structured_event_types {
204 let event_type_str = event_type.to_string();
205 if !available_event_types.contains(&event_type_str) {
206 debug!("Creating the '{event_type}' event type");
207
208 let body = ApiEventTypePost {
209 application_id: self.application_id,
210 service: event_type.service,
211 resource_type: event_type.resource_type,
212 verb: event_type.verb,
213 };
214
215 self.client
216 .post(event_types_url.as_str())
217 .json(&body)
218 .send()
219 .await
220 .map_err(|e| Hook0ClientError::CreatingEventType {
221 event_type_name: body.to_string(),
222 error: e,
223 })?
224 .error_for_status()
225 .map_err(|e| Hook0ClientError::CreatingEventType {
226 event_type_name: body.to_string(),
227 error: e,
228 })?;
229
230 added_event_types.push(body.to_string());
231 }
232 }
233 debug!("{} new event types were created", added_event_types.len());
234
235 Ok(added_event_types)
236 }
237}
238
239#[cfg(feature = "consumer")]
240pub fn verify_webhook_signature_with_current_time<
249 HeaderKey: AsRef<[u8]>,
250 HeaderValue: AsRef<[u8]>,
251>(
252 signature: &str,
253 payload: &[u8],
254 headers: &[(HeaderKey, HeaderValue)],
255 subscription_secret: &str,
256 tolerance: StdDuration,
257 current_time: DateTime<Utc>,
258) -> Result<(), Hook0ClientError> {
259 let parsed_sig =
260 signature::Signature::parse(signature).map_err(|_| Hook0ClientError::InvalidSignature)?;
261
262 let headers_with_parsed_name = headers
263 .iter()
264 .map(|(k, v)| {
265 let name = http::HeaderName::from_bytes(k.as_ref()).map_err(|error| {
266 Hook0ClientError::InvalidHeaderName {
267 header_name: String::from_utf8_lossy(k.as_ref()).into_owned(),
268 error,
269 }
270 });
271 name.map(|n| (n, v))
272 })
273 .collect::<Result<std::collections::HashMap<_, _>, _>>()?;
274 let headers_vec = parsed_sig
275 .h
276 .iter()
277 .map(|expected| {
278 headers_with_parsed_name
279 .get(expected)
280 .ok_or_else(|| Hook0ClientError::MissingHeader(expected.to_owned()))
281 .and_then(|v| {
282 String::from_utf8(v.as_ref().to_vec()).map_err(|error| {
283 Hook0ClientError::InvalidHeaderValue {
284 header_name: expected.to_owned(),
285 header_value: String::from_utf8_lossy(v.as_ref()).into_owned(),
286 error,
287 }
288 })
289 })
290 })
291 .collect::<Result<Vec<_>, _>>()?;
292
293 if !parsed_sig.verify(payload, &headers_vec, subscription_secret) {
294 Err(Hook0ClientError::InvalidSignature)
295 } else {
296 let signed_at = DateTime::from_timestamp(parsed_sig.timestamp, 0);
297
298 match signed_at {
299 Some(signed_at) => {
300 let tolerance = Duration::from_std(tolerance);
301 match tolerance {
302 Ok(tolerance) => {
303 if (current_time - signed_at) > tolerance {
304 Err(Hook0ClientError::ExpiredWebhook {
305 signed_at,
306 tolerance,
307 current_time,
308 })
309 } else {
310 Ok(())
311 }
312 }
313 Err(e) => Err(Hook0ClientError::InvalidTolerance(e)),
314 }
315 }
316 None => Err(Hook0ClientError::InvalidSignature),
317 }
318 }
319}
320
321#[cfg(feature = "consumer")]
322pub fn verify_webhook_signature<HeaderKey: AsRef<[u8]>, HeaderValue: AsRef<[u8]>>(
330 signature: &str,
331 payload: &[u8],
332 headers: &[(HeaderKey, HeaderValue)],
333 subscription_secret: &str,
334 tolerance: StdDuration,
335) -> Result<(), Hook0ClientError> {
336 verify_webhook_signature_with_current_time(
337 signature,
338 payload,
339 headers,
340 subscription_secret,
341 tolerance,
342 Utc::now(),
343 )
344}
345
346#[cfg(feature = "producer")]
347#[derive(Debug, Serialize, PartialEq, Eq)]
349struct EventType {
350 service: String,
351 resource_type: String,
352 verb: String,
353}
354
355#[cfg(feature = "producer")]
356impl FromStr for EventType {
357 type Err = ();
358
359 fn from_str(s: &str) -> Result<Self, Self::Err> {
360 let captures = regex_captures!("^([A-Z0-9_]+)[.]([A-Z0-9_]+)[.]([A-Z0-9_]+)$"i, s);
361 if let Some((_, service, resource_type, verb)) = captures {
362 Ok(Self {
363 resource_type: resource_type.to_owned(),
364 service: service.to_owned(),
365 verb: verb.to_owned(),
366 })
367 } else {
368 Err(())
369 }
370 }
371}
372
373#[cfg(feature = "producer")]
374impl Display for EventType {
375 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
376 write!(f, "{}.{}.{}", self.service, self.resource_type, self.verb)
377 }
378}
379
380#[cfg(feature = "producer")]
381#[derive(Debug, Clone, PartialEq, Eq)]
383pub struct Event<'a> {
384 pub event_id: Option<&'a Uuid>,
386 pub event_type: &'a str,
388 pub payload: Cow<'a, str>,
390 pub payload_content_type: &'a str,
392 pub metadata: Option<Vec<(String, String)>>,
394 pub occurred_at: Option<DateTime<Utc>>,
396 pub labels: Vec<(String, String)>,
398}
399
400#[cfg(feature = "producer")]
401#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
402struct FullEvent<'a> {
403 pub application_id: Uuid,
404 #[serde(skip_serializing_if = "Option::is_none")]
405 pub event_id: Option<&'a Uuid>,
406 pub event_type: &'a str,
407 pub payload: &'a str,
408 pub payload_content_type: &'a str,
409 pub metadata: Option<HashMap<String, String>>,
410 pub occurred_at: DateTime<Utc>,
411 pub labels: HashMap<String, String>,
412}
413
414#[cfg(feature = "producer")]
415impl<'a> FullEvent<'a> {
416 pub fn from_event(event: &'a Event, application_id: &Uuid) -> Self {
417 let occurred_at = event.occurred_at.unwrap_or_else(Utc::now);
418
419 Self {
420 application_id: application_id.to_owned(),
421 event_id: event.event_id,
422 event_type: event.event_type,
423 payload: event.payload.as_ref(),
424 payload_content_type: event.payload_content_type,
425 metadata: event
426 .metadata
427 .as_ref()
428 .map(|items| HashMap::from_iter(items.iter().cloned())),
429 occurred_at,
430 labels: HashMap::from_iter(event.labels.iter().cloned()),
431 }
432 }
433}
434
435#[derive(Debug, thiserror::Error)]
437pub enum Hook0ClientError {
438 #[cfg(feature = "producer")]
439 #[error("Could not build auth header: {0}")]
443 AuthHeader(InvalidHeaderValue),
444
445 #[cfg(feature = "producer")]
446 #[error("Could not build reqwest HTTP client: {0}")]
450 ReqwestClient(reqwest::Error),
451
452 #[cfg(feature = "producer")]
453 #[error("Could not create a valid URL to request Hook0's API: {0}")]
457 Url(ParseError),
458
459 #[cfg(feature = "producer")]
460 #[error("Sending event{} failed: {error} [body={}]", event_id.map(|id| format!(" {id}")).unwrap_or_else(String::new), body.as_deref().unwrap_or(""))]
462 EventSending {
463 event_id: Option<Uuid>,
465
466 error: reqwest::Error,
468
469 body: Option<String>,
471 },
472
473 #[cfg(feature = "producer")]
474 #[error("Provided event type '{0}' does not have a valid syntax (service.resource_type.verb)")]
476 InvalidEventType(String),
477
478 #[cfg(feature = "producer")]
479 #[error("Getting available event types failed: {0}")]
481 GetAvailableEventTypes(reqwest::Error),
482
483 #[cfg(feature = "producer")]
484 #[error("Creating event type '{event_type_name}' failed: {error}")]
486 CreatingEventType {
487 event_type_name: String,
489
490 error: reqwest::Error,
492 },
493
494 #[cfg(feature = "consumer")]
495 #[error("Invalid signature")]
497 InvalidSignature,
498
499 #[cfg(feature = "consumer")]
500 #[error(
502 "The webhook has expired because it was sent too long ago (signed_at={signed_at}, tolerance={tolerance}, current_time={current_time})"
503 )]
504 ExpiredWebhook {
505 signed_at: DateTime<Utc>,
507
508 tolerance: Duration,
510
511 current_time: DateTime<Utc>,
513 },
514
515 #[cfg(feature = "consumer")]
516 #[error("Could not parse signature header: {0}")]
518 SignatureHeaderParsing(String),
519
520 #[cfg(feature = "consumer")]
521 #[error("Could not parse timestamp `{timestamp}` in signature: {error}")]
523 TimestampParsing {
524 timestamp: String,
526
527 error: std::num::ParseIntError,
529 },
530
531 #[cfg(feature = "consumer")]
532 #[error("Could not parse v0 signature `{signature}`: {error}")]
534 V0SignatureParsing {
535 signature: String,
537
538 error: hex::FromHexError,
540 },
541
542 #[cfg(feature = "consumer")]
543 #[error("Could not parse header name `{header}` in `h` field: {error}")]
545 HeaderNameParsing {
546 header: String,
548
549 error: http::header::InvalidHeaderName,
551 },
552
553 #[cfg(feature = "consumer")]
554 #[error("Could not parse v1 signature `{signature}`: {error}")]
556 V1SignatureParsing {
557 signature: String,
559
560 error: hex::FromHexError,
562 },
563
564 #[cfg(feature = "consumer")]
565 #[error("The `{0}` header present in the webhook's signature was not provided with a value")]
567 MissingHeader(http::HeaderName),
568
569 #[cfg(feature = "consumer")]
570 #[error("Provided `{header_name}` has an invalid header name: {error}")]
572 InvalidHeaderName {
573 header_name: String,
575
576 error: http::header::InvalidHeaderName,
578 },
579
580 #[cfg(feature = "consumer")]
581 #[error("Provided `{header_name}` has an invalid header value `{header_value}`: {error}")]
583 InvalidHeaderValue {
584 header_name: http::HeaderName,
586
587 header_value: String,
589
590 error: std::string::FromUtf8Error,
592 },
593
594 #[cfg(feature = "consumer")]
595 #[error("Invalid tolerance Duration: {0}")]
597 InvalidTolerance(OutOfRangeError),
598}
599
600#[cfg(feature = "producer")]
601impl Hook0ClientError {
602 pub fn log_and_return(self) -> Self {
604 error!("{self}");
605 self
606 }
607}
608
609#[cfg(feature = "producer")]
610fn append_url_segments(base_url: &Url, segments: &[&str]) -> Result<Url, url::ParseError> {
611 const SEP: &str = "/";
612 let segments_str = segments.join(SEP);
613
614 let url = Url::parse(&format!("{base_url}/{segments_str}").replace("//", "/"))?;
615
616 Ok(url)
617}
618
619#[cfg(test)]
620mod tests {
621 use super::*;
622
623 #[cfg(feature = "producer")]
624 #[test]
625 fn displaying_event_type() {
626 let et = EventType {
627 service: "service".to_owned(),
628 resource_type: "resource".to_owned(),
629 verb: "verb".to_owned(),
630 };
631
632 assert_eq!(et.to_string(), "service.resource.verb")
633 }
634
635 #[cfg(feature = "producer")]
636 #[test]
637 fn parsing_valid_event_type() {
638 let et = EventType {
639 service: "service".to_owned(),
640 resource_type: "resource".to_owned(),
641 verb: "verb".to_owned(),
642 };
643
644 assert_eq!(EventType::from_str(&et.to_string()), Ok(et))
645 }
646
647 #[cfg(feature = "producer")]
648 #[test]
649 fn parsing_invalid_event_type() {
650 assert_eq!(EventType::from_str("test.test"), Err(()))
651 }
652
653 #[cfg(feature = "consumer")]
654 #[test]
655 fn verifying_valid_signature_v0() {
656 let signature =
657 "t=1636936200,v0=1b3d69df55f1e52f05224ba94a5162abeb17ef52cd7f4948c390f810d6a87e98";
658 let payload = "hello !".as_bytes();
659 let subscription_secret = "secret";
660 let tolerance = StdDuration::from_secs((i64::MAX / 1000) as u64);
661
662 assert!(
663 verify_webhook_signature::<&str, &str>(
664 signature,
665 payload,
666 &[],
667 subscription_secret,
668 tolerance
669 )
670 .is_ok()
671 );
672 }
673
674 #[cfg(feature = "consumer")]
675 #[test]
676 fn verifying_valid_signature_v0_with_current_time() {
677 let signature =
678 "t=1636936200,v0=1b3d69df55f1e52f05224ba94a5162abeb17ef52cd7f4948c390f810d6a87e98";
679 let payload = "hello !".as_bytes();
680 let subscription_secret = "secret";
681 let tolerance = StdDuration::from_secs((i64::MAX / 1000) as u64);
682
683 assert!(
684 verify_webhook_signature::<&str, &str>(
685 signature,
686 payload,
687 &[],
688 subscription_secret,
689 tolerance
690 )
691 .is_ok()
692 );
693 }
694
695 #[cfg(feature = "consumer")]
696 #[test]
697 fn verifying_expired_signature_v0() {
698 let signature =
699 "t=1636936200,v0=1b3d69df55f1e52f05224ba94a5162abeb17ef52cd7f4948c390f810d6a87e98";
700 let payload = "hello !".as_bytes();
701 let subscription_secret = "secret";
702 let tolerance = StdDuration::from_secs(300);
703
704 assert!(
705 verify_webhook_signature::<&str, &str>(
706 signature,
707 payload,
708 &[],
709 subscription_secret,
710 tolerance
711 )
712 .is_err()
713 );
714 }
715
716 #[cfg(feature = "consumer")]
717 #[test]
718 fn verifying_valid_signature_v1() {
719 let signature = "t=1636936200,h=x-test x-test2,v1=493c35f05443fdb74cb99fd4f00e0e7653c2ab6b24fbc97f4a7bd4d56b31758a";
720 let payload = "hello !".as_bytes();
721 let header_values = [("x-test", "val1"), ("x-test2", "val2")];
722 let subscription_secret = "secret";
723 let tolerance = StdDuration::from_secs((i64::MAX / 1000) as u64);
724
725 assert!(
726 verify_webhook_signature::<&str, &str>(
727 signature,
728 payload,
729 &header_values,
730 subscription_secret,
731 tolerance
732 )
733 .is_ok()
734 );
735 }
736
737 #[cfg(feature = "consumer")]
738 #[test]
739 fn verifying_valid_signature_v1_with_current_time() {
740 let signature = "t=1636936200,h=x-test x-test2,v1=493c35f05443fdb74cb99fd4f00e0e7653c2ab6b24fbc97f4a7bd4d56b31758a";
741 let payload = "hello !".as_bytes();
742 let header_values = [("x-test", "val1"), ("x-test2", "val2")];
743 let subscription_secret = "secret";
744 let tolerance = StdDuration::from_secs((i64::MAX / 1000) as u64);
745
746 assert!(
747 verify_webhook_signature::<&str, &str>(
748 signature,
749 payload,
750 &header_values,
751 subscription_secret,
752 tolerance
753 )
754 .is_ok()
755 );
756 }
757}