1use crate::monitor::IpChange;
4use crate::time::{Sleeper, TokioSleeper};
5
6use super::{HttpClient, HttpError, HttpRequest, RetryPolicy, RetryableError, WebhookError};
7use handlebars::Handlebars;
8use serde::Serialize;
9
10pub trait WebhookSender: Send + Sync {
20 fn send(
30 &self,
31 changes: &[IpChange],
32 ) -> impl std::future::Future<Output = Result<(), WebhookError>> + Send;
33}
34
35#[derive(Debug)]
66pub struct HttpWebhook<H, S = TokioSleeper> {
67 client: H,
68 sleeper: S,
69 url: url::Url,
70 method: http::Method,
71 headers: http::HeaderMap,
72 body_template: Option<String>,
73 retry_policy: RetryPolicy,
74}
75
76impl<H> HttpWebhook<H, TokioSleeper> {
77 #[must_use]
82 pub fn new(client: H, url: url::Url) -> Self {
83 Self {
84 client,
85 sleeper: TokioSleeper,
86 url,
87 method: http::Method::POST,
88 headers: http::HeaderMap::new(),
89 body_template: None,
90 retry_policy: RetryPolicy::default(),
91 }
92 }
93}
94
95impl<H, S> HttpWebhook<H, S> {
96 #[must_use]
100 pub fn with_sleeper<S2>(self, sleeper: S2) -> HttpWebhook<H, S2> {
101 HttpWebhook {
102 client: self.client,
103 sleeper,
104 url: self.url,
105 method: self.method,
106 headers: self.headers,
107 body_template: self.body_template,
108 retry_policy: self.retry_policy,
109 }
110 }
111
112 #[must_use]
114 pub fn with_method(mut self, method: http::Method) -> Self {
115 self.method = method;
116 self
117 }
118
119 #[must_use]
121 pub fn with_headers(mut self, headers: http::HeaderMap) -> Self {
122 self.headers = headers;
123 self
124 }
125
126 #[must_use]
128 pub fn with_body_template(mut self, template: impl Into<String>) -> Self {
129 self.body_template = Some(template.into());
130 self
131 }
132
133 #[must_use]
135 pub const fn with_retry_policy(mut self, policy: RetryPolicy) -> Self {
136 self.retry_policy = policy;
137 self
138 }
139
140 #[must_use]
142 pub const fn url(&self) -> &url::Url {
143 &self.url
144 }
145
146 #[must_use]
148 pub const fn method(&self) -> &http::Method {
149 &self.method
150 }
151
152 #[must_use]
154 pub const fn retry_policy(&self) -> &RetryPolicy {
155 &self.retry_policy
156 }
157}
158
159#[derive(Serialize)]
161struct TemplateData<'a> {
162 changes: Vec<ChangeData<'a>>,
163}
164
165#[derive(Serialize)]
167struct ChangeData<'a> {
168 adapter: &'a str,
169 address: String,
170 kind: &'static str,
171 timestamp: u64,
172}
173
174impl<'a> From<&'a IpChange> for ChangeData<'a> {
175 fn from(change: &'a IpChange) -> Self {
176 let kind = if change.is_added() {
177 "added"
178 } else {
179 "removed"
180 };
181 let timestamp = change
183 .timestamp
184 .duration_since(std::time::UNIX_EPOCH)
185 .map_or(0, |d| d.as_secs());
186
187 Self {
188 adapter: &change.adapter,
189 address: change.address.to_string(),
190 kind,
191 timestamp,
192 }
193 }
194}
195
196impl<H: HttpClient, S: Sleeper> HttpWebhook<H, S> {
197 fn render_body(&self, changes: &[IpChange]) -> Result<Option<Vec<u8>>, RetryableError> {
199 let Some(template) = &self.body_template else {
200 return Ok(None);
201 };
202
203 let data = TemplateData {
204 changes: changes.iter().map(ChangeData::from).collect(),
205 };
206
207 let handlebars = Handlebars::new();
208 let rendered = handlebars
209 .render_template(template, &data)
210 .map_err(|e| RetryableError::Template(e.to_string()))?;
211
212 Ok(Some(rendered.into_bytes()))
213 }
214
215 fn build_request(&self, changes: &[IpChange]) -> Result<HttpRequest, RetryableError> {
217 let mut request = HttpRequest::new(self.method.clone(), self.url.clone());
218
219 for (name, value) in &self.headers {
221 request.headers.append(name, value.clone());
222 }
223
224 if let Some(body) = self.render_body(changes)? {
226 request.body = Some(body);
227 }
228
229 Ok(request)
230 }
231
232 async fn execute_request(&self, request: &HttpRequest) -> Result<(), RetryableError> {
234 let response = self.client.request(request.clone()).await?;
235
236 if response.is_success() {
237 return Ok(());
238 }
239
240 Err(RetryableError::NonSuccessStatus {
241 status: response.status,
242 body: response.body_text().map(ToString::to_string),
243 })
244 }
245
246 async fn send_with_retry(&self, changes: &[IpChange]) -> Result<(), WebhookError> {
248 let request = self.build_request(changes)?;
249
250 let mut last_error: Option<RetryableError> = None;
251
252 for attempt in 1..=self.retry_policy.max_attempts {
253 match self.execute_request(&request).await {
254 Ok(()) => return Ok(()),
255 Err(e) => {
256 if !e.is_retryable() {
258 return Err(e.into());
259 }
260
261 last_error = Some(e);
262
263 if self.retry_policy.should_retry(attempt) {
265 let delay = self.retry_policy.delay_for_retry(attempt - 1);
266 self.sleeper.sleep(delay).await;
267 }
268 }
269 }
270 }
271
272 Err(WebhookError::MaxRetriesExceeded {
273 attempts: self.retry_policy.max_attempts,
274 last_error: last_error.expect("max_attempts >= 1 ensures at least one attempt"),
275 })
276 }
277}
278
279impl<H: HttpClient, S: Sleeper> WebhookSender for HttpWebhook<H, S> {
280 async fn send(&self, changes: &[IpChange]) -> Result<(), WebhookError> {
281 self.send_with_retry(changes).await
282 }
283}
284
285pub trait IsRetryable {
291 fn is_retryable(&self) -> bool;
293}
294
295impl IsRetryable for HttpError {
296 fn is_retryable(&self) -> bool {
297 match self {
298 Self::Connection(_) | Self::Timeout => true,
300 Self::InvalidUrl(_) => false,
302 }
303 }
304}
305
306impl IsRetryable for RetryableError {
307 fn is_retryable(&self) -> bool {
308 match self {
309 Self::Http(e) => e.is_retryable(),
310 Self::NonSuccessStatus { status, .. } => {
314 status.is_server_error()
315 || *status == http::StatusCode::TOO_MANY_REQUESTS
316 || *status == http::StatusCode::REQUEST_TIMEOUT
317 }
318 Self::Template(_) => false,
320 }
321 }
322}