use crate::monitor::IpChange;
use crate::time::{Sleeper, TokioSleeper};
use super::{HttpClient, HttpError, HttpRequest, RetryPolicy, RetryableError, WebhookError};
use handlebars::Handlebars;
use serde::Serialize;
pub trait WebhookSender: Send + Sync {
fn send(
&self,
changes: &[IpChange],
) -> impl std::future::Future<Output = Result<(), WebhookError>> + Send;
}
#[derive(Debug)]
pub struct HttpWebhook<H, S = TokioSleeper> {
client: H,
sleeper: S,
url: url::Url,
method: http::Method,
headers: http::HeaderMap,
body_template: Option<String>,
retry_policy: RetryPolicy,
}
impl<H> HttpWebhook<H, TokioSleeper> {
#[must_use]
pub fn new(client: H, url: url::Url) -> Self {
Self {
client,
sleeper: TokioSleeper,
url,
method: http::Method::POST,
headers: http::HeaderMap::new(),
body_template: None,
retry_policy: RetryPolicy::default(),
}
}
}
impl<H, S> HttpWebhook<H, S> {
#[must_use]
pub fn with_sleeper<S2>(self, sleeper: S2) -> HttpWebhook<H, S2> {
HttpWebhook {
client: self.client,
sleeper,
url: self.url,
method: self.method,
headers: self.headers,
body_template: self.body_template,
retry_policy: self.retry_policy,
}
}
#[must_use]
pub fn with_method(mut self, method: http::Method) -> Self {
self.method = method;
self
}
#[must_use]
pub fn with_headers(mut self, headers: http::HeaderMap) -> Self {
self.headers = headers;
self
}
#[must_use]
pub fn with_body_template(mut self, template: impl Into<String>) -> Self {
self.body_template = Some(template.into());
self
}
#[must_use]
pub const fn with_retry_policy(mut self, policy: RetryPolicy) -> Self {
self.retry_policy = policy;
self
}
#[must_use]
pub const fn url(&self) -> &url::Url {
&self.url
}
#[must_use]
pub const fn method(&self) -> &http::Method {
&self.method
}
#[must_use]
pub const fn retry_policy(&self) -> &RetryPolicy {
&self.retry_policy
}
}
#[derive(Serialize)]
struct TemplateData<'a> {
changes: Vec<ChangeData<'a>>,
}
#[derive(Serialize)]
struct ChangeData<'a> {
adapter: &'a str,
address: String,
kind: &'static str,
timestamp: u64,
}
impl<'a> From<&'a IpChange> for ChangeData<'a> {
fn from(change: &'a IpChange) -> Self {
let kind = if change.is_added() {
"added"
} else {
"removed"
};
let timestamp = change
.timestamp
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_secs());
Self {
adapter: &change.adapter,
address: change.address.to_string(),
kind,
timestamp,
}
}
}
impl<H: HttpClient, S: Sleeper> HttpWebhook<H, S> {
fn render_body(&self, changes: &[IpChange]) -> Result<Option<Vec<u8>>, RetryableError> {
let Some(template) = &self.body_template else {
return Ok(None);
};
let data = TemplateData {
changes: changes.iter().map(ChangeData::from).collect(),
};
let handlebars = Handlebars::new();
let rendered = handlebars
.render_template(template, &data)
.map_err(|e| RetryableError::Template(e.to_string()))?;
Ok(Some(rendered.into_bytes()))
}
fn build_request(&self, changes: &[IpChange]) -> Result<HttpRequest, RetryableError> {
let mut request = HttpRequest::new(self.method.clone(), self.url.clone());
for (name, value) in &self.headers {
request.headers.append(name, value.clone());
}
if let Some(body) = self.render_body(changes)? {
request.body = Some(body);
}
Ok(request)
}
async fn execute_request(&self, request: &HttpRequest) -> Result<(), RetryableError> {
let response = self.client.request(request.clone()).await?;
if response.is_success() {
return Ok(());
}
Err(RetryableError::NonSuccessStatus {
status: response.status,
body: response.body_text().map(ToString::to_string),
})
}
async fn send_with_retry(&self, changes: &[IpChange]) -> Result<(), WebhookError> {
let request = self.build_request(changes)?;
let mut last_error: Option<RetryableError> = None;
for attempt in 1..=self.retry_policy.max_attempts {
match self.execute_request(&request).await {
Ok(()) => return Ok(()),
Err(e) => {
if !e.is_retryable() {
return Err(e.into());
}
last_error = Some(e);
if self.retry_policy.should_retry(attempt) {
let delay = self.retry_policy.delay_for_retry(attempt - 1);
self.sleeper.sleep(delay).await;
}
}
}
}
Err(WebhookError::MaxRetriesExceeded {
attempts: self.retry_policy.max_attempts,
last_error: last_error.expect("max_attempts >= 1 ensures at least one attempt"),
})
}
}
impl<H: HttpClient, S: Sleeper> WebhookSender for HttpWebhook<H, S> {
async fn send(&self, changes: &[IpChange]) -> Result<(), WebhookError> {
self.send_with_retry(changes).await
}
}
pub trait IsRetryable {
fn is_retryable(&self) -> bool;
}
impl IsRetryable for HttpError {
fn is_retryable(&self) -> bool {
match self {
Self::Connection(_) | Self::Timeout => true,
Self::InvalidUrl(_) => false,
}
}
}
impl IsRetryable for RetryableError {
fn is_retryable(&self) -> bool {
match self {
Self::Http(e) => e.is_retryable(),
Self::NonSuccessStatus { status, .. } => {
status.is_server_error()
|| *status == http::StatusCode::TOO_MANY_REQUESTS
|| *status == http::StatusCode::REQUEST_TIMEOUT
}
Self::Template(_) => false,
}
}
}