1#![allow(
2 clippy::significant_drop_tightening,
3 clippy::missing_panics_doc,
4 clippy::missing_errors_doc
5)]
6use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, RwLock};
14
15use crate::http_client::Client;
16use crate::{AppState, AutumnError, AutumnResult};
17
18const MAX_LOGGED_RESPONSE_BODY_BYTES: usize = 16 * 1024;
19const TRUNCATED_RESPONSE_BODY_SUFFIX: &str = "\n[truncated]";
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
23#[serde(rename_all = "lowercase")]
24pub enum WebhookSubscriptionStatus {
25 Active,
26 Disabled,
27 Failed,
28}
29
30impl WebhookSubscriptionStatus {
31 #[must_use]
33 pub const fn as_str(self) -> &'static str {
34 match self {
35 Self::Active => "active",
36 Self::Disabled => "disabled",
37 Self::Failed => "failed",
38 }
39 }
40}
41
42impl std::fmt::Display for WebhookSubscriptionStatus {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 f.write_str(self.as_str())
45 }
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct WebhookSubscription {
51 pub id: String,
52 pub target_url: String,
53 pub event_topics: Vec<String>,
54 pub secret: String,
55 pub status: WebhookSubscriptionStatus,
56 pub consecutive_failures: u32,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct WebhookDeliveryLog {
62 pub id: String,
63 pub subscription_id: String,
64 pub topic: String,
65 pub payload: String,
66 pub request_headers: HashMap<String, String>,
67 pub response_status: Option<u16>,
68 pub response_body: Option<String>,
69 pub elapsed_ms: u64,
70 pub attempt: u32,
71 pub max_attempts: u32,
72 pub is_dlq: bool,
73 pub last_error: Option<String>,
74 pub timestamp: DateTime<Utc>,
75}
76
77pub trait OutboundWebhookHandler: Send + Sync + 'static {
79 fn get_subscriptions(
81 &self,
82 topic: &str,
83 ) -> Pin<Box<dyn Future<Output = AutumnResult<Vec<WebhookSubscription>>> + Send>>;
84
85 fn log_delivery(
87 &self,
88 log: WebhookDeliveryLog,
89 ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>>;
90
91 fn replace_delivery_log(
96 &self,
97 log: WebhookDeliveryLog,
98 ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>>;
99
100 fn get_subscription(
102 &self,
103 id: &str,
104 ) -> Pin<Box<dyn Future<Output = AutumnResult<Option<WebhookSubscription>>> + Send>>;
105
106 fn get_dlq_logs(
108 &self,
109 ) -> Pin<Box<dyn Future<Output = AutumnResult<Vec<WebhookDeliveryLog>>> + Send>> {
110 Box::pin(async { Ok(Vec::new()) })
111 }
112
113 fn get_delivery_log(
115 &self,
116 id: &str,
117 ) -> Pin<Box<dyn Future<Output = AutumnResult<Option<WebhookDeliveryLog>>> + Send>>;
118
119 fn reset_subscription_failures(
121 &self,
122 _id: &str,
123 ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
124 Box::pin(async { Ok(()) })
125 }
126
127 fn reactivate_failed_subscription(
132 &self,
133 id: &str,
134 ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
135 self.reset_subscription_failures(id)
136 }
137}
138
139pub use OutboundWebhookHandler as OutboundWebhookStore;
141
142#[derive(Debug, Default)]
144pub struct InMemoryOutboundWebhookHandler {
145 subscriptions: RwLock<HashMap<String, WebhookSubscription>>,
146 logs: RwLock<HashMap<String, WebhookDeliveryLog>>,
147}
148
149pub type InMemoryOutboundWebhookStore = InMemoryOutboundWebhookHandler;
151
152impl InMemoryOutboundWebhookHandler {
153 #[must_use]
155 pub fn new() -> Self {
156 Self::default()
157 }
158
159 #[allow(clippy::unused_async)]
161 pub async fn create_subscription(
162 &self,
163 sub: WebhookSubscription,
164 ) -> AutumnResult<WebhookSubscription> {
165 let mut subs = self
166 .subscriptions
167 .write()
168 .expect("subscriptions write lock poisoned");
169 subs.insert(sub.id.clone(), sub.clone());
170 Ok(sub)
171 }
172
173 #[allow(clippy::unused_async)]
175 pub async fn get_delivery_logs(&self) -> AutumnResult<Vec<WebhookDeliveryLog>> {
176 let logs = self.logs.read().expect("logs read lock poisoned");
177 let mut list: Vec<WebhookDeliveryLog> = logs.values().cloned().collect();
178 list.sort_by_key(|l| l.timestamp);
179 list.reverse();
180 Ok(list)
181 }
182
183 #[allow(clippy::unused_async)]
185 pub async fn get_subscription(&self, id: &str) -> AutumnResult<Option<WebhookSubscription>> {
186 let subs = self
187 .subscriptions
188 .read()
189 .expect("subscriptions read lock poisoned");
190 Ok(subs.get(id).cloned())
191 }
192}
193
194impl OutboundWebhookHandler for InMemoryOutboundWebhookHandler {
195 fn get_subscriptions(
196 &self,
197 topic: &str,
198 ) -> Pin<Box<dyn Future<Output = AutumnResult<Vec<WebhookSubscription>>> + Send>> {
199 let subs = self
200 .subscriptions
201 .read()
202 .expect("subscriptions read lock poisoned");
203 let topic = topic.to_owned();
204 let list: Vec<WebhookSubscription> = subs
205 .values()
206 .filter(|sub| {
207 sub.event_topics.iter().any(|t| t == &topic)
208 && sub.status == WebhookSubscriptionStatus::Active
209 })
210 .cloned()
211 .collect();
212 Box::pin(async move { Ok(list) })
213 }
214
215 fn get_subscription(
216 &self,
217 id: &str,
218 ) -> Pin<Box<dyn Future<Output = AutumnResult<Option<WebhookSubscription>>> + Send>> {
219 let subs = self
220 .subscriptions
221 .read()
222 .expect("subscriptions read lock poisoned");
223 let sub = subs.get(id).cloned();
224 Box::pin(async move { Ok(sub) })
225 }
226
227 fn log_delivery(
228 &self,
229 log: WebhookDeliveryLog,
230 ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
231 let mut logs = self.logs.write().expect("logs write lock poisoned");
232 logs.insert(log.id.clone(), log.clone());
233
234 let mut subs = self
236 .subscriptions
237 .write()
238 .expect("subscriptions write lock poisoned");
239 if let Some(sub) = subs.get_mut(&log.subscription_id) {
240 let is_active = sub.status == WebhookSubscriptionStatus::Active;
241 if is_active {
242 if let Some(status) = log.response_status {
243 if (200..300).contains(&status) {
244 sub.consecutive_failures = 0;
245 } else {
246 sub.consecutive_failures = sub.consecutive_failures.saturating_add(1);
247 if sub.consecutive_failures >= 50 {
248 sub.status = WebhookSubscriptionStatus::Failed;
249 tracing::warn!(subscription_id = %sub.id, "Webhook subscription auto-disabled due to 50 consecutive failures");
250 }
251 }
252 } else if log.last_error.is_some() {
253 sub.consecutive_failures = sub.consecutive_failures.saturating_add(1);
254 if sub.consecutive_failures >= 50 {
255 sub.status = WebhookSubscriptionStatus::Failed;
256 tracing::warn!(subscription_id = %sub.id, "Webhook subscription auto-disabled due to 50 consecutive failures");
257 }
258 }
259 }
260 }
261
262 Box::pin(async move { Ok(()) })
263 }
264
265 fn replace_delivery_log(
266 &self,
267 log: WebhookDeliveryLog,
268 ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
269 let mut logs = self.logs.write().expect("logs write lock poisoned");
270 logs.insert(log.id.clone(), log);
271 Box::pin(async move { Ok(()) })
272 }
273
274 fn get_dlq_logs(
275 &self,
276 ) -> Pin<Box<dyn Future<Output = AutumnResult<Vec<WebhookDeliveryLog>>> + Send>> {
277 let list = {
278 let logs = self.logs.read().expect("logs read lock poisoned");
279 let mut list: Vec<WebhookDeliveryLog> =
280 logs.values().filter(|l| l.is_dlq).cloned().collect();
281 list.sort_by_key(|l| l.timestamp);
282 list.reverse();
283 list
284 };
285 Box::pin(async move { Ok(list) })
286 }
287
288 fn get_delivery_log(
289 &self,
290 id: &str,
291 ) -> Pin<Box<dyn Future<Output = AutumnResult<Option<WebhookDeliveryLog>>> + Send>> {
292 let log = self
293 .logs
294 .read()
295 .expect("logs read lock poisoned")
296 .get(id)
297 .cloned();
298 Box::pin(async move { Ok(log) })
299 }
300
301 fn reset_subscription_failures(
302 &self,
303 id: &str,
304 ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
305 {
306 let mut subs = self
307 .subscriptions
308 .write()
309 .expect("subscriptions write lock poisoned");
310 if let Some(sub) = subs.get_mut(id) {
311 sub.consecutive_failures = 0;
312 }
313 }
314 Box::pin(async move { Ok(()) })
315 }
316
317 fn reactivate_failed_subscription(
318 &self,
319 id: &str,
320 ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
321 {
322 let mut subs = self
323 .subscriptions
324 .write()
325 .expect("subscriptions write lock poisoned");
326 if let Some(sub) = subs.get_mut(id) {
327 sub.consecutive_failures = 0;
328 if sub.status == WebhookSubscriptionStatus::Failed {
329 sub.status = WebhookSubscriptionStatus::Active;
330 }
331 }
332 }
333 Box::pin(async move { Ok(()) })
334 }
335}
336
337pub type WebhookDelegate = Arc<
339 dyn Fn(
340 &AppState,
341 WebhookSubscription,
342 WebhookDeliveryLog,
343 ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>>
344 + Send
345 + Sync,
346>;
347
348#[derive(Clone)]
350pub struct WebhookDelegateExt(pub WebhookDelegate);
351
352#[derive(Clone)]
354pub struct WebhookOutboundManager {
355 handler: Arc<dyn OutboundWebhookHandler>,
356 client: Client,
357 initial_backoff_ms: u64,
358}
359
360impl WebhookOutboundManager {
361 pub fn new(handler: Arc<dyn OutboundWebhookHandler>) -> Self {
363 Self {
364 handler,
365 client: Client::new(),
366 initial_backoff_ms: 1000,
367 }
368 }
369
370 #[must_use]
372 pub const fn with_initial_backoff_ms(mut self, ms: u64) -> Self {
373 self.initial_backoff_ms = ms;
374 self
375 }
376
377 fn with_client_from_state(mut self, state: &AppState) -> Self {
378 self.client = Client::from_state(state);
379 self
380 }
381
382 #[must_use]
384 pub fn store(&self) -> &Arc<dyn OutboundWebhookHandler> {
385 &self.handler
386 }
387
388 #[must_use]
390 pub const fn client(&self) -> &Client {
391 &self.client
392 }
393
394 pub async fn dispatch<T: Serialize + Sync>(
400 &self,
401 state: &AppState,
402 topic: &str,
403 payload: &T,
404 ) -> AutumnResult<()> {
405 let serialized = serde_json::to_string(payload).map_err(|e| {
406 AutumnError::internal_server_error_msg(format!("failed to serialize payload: {e}"))
407 })?;
408
409 let mut errors = Vec::new();
410 let subs = self.handler.get_subscriptions(topic).await?;
411 for sub in subs {
412 if sub.status == WebhookSubscriptionStatus::Disabled {
413 continue;
414 }
415
416 let log_id = uuid::Uuid::new_v4().to_string();
417 let log = WebhookDeliveryLog {
418 id: log_id.clone(),
419 subscription_id: sub.id.clone(),
420 topic: topic.to_owned(),
421 payload: serialized.clone(),
422 request_headers: HashMap::new(),
423 response_status: None,
424 response_body: None,
425 elapsed_ms: 0,
426 attempt: 1,
427 max_attempts: 5,
428 is_dlq: false,
429 last_error: None,
430 timestamp: Utc::now(),
431 };
432
433 if let Err(e) = self.handler.log_delivery(log.clone()).await {
435 errors.push(e);
436 continue;
437 }
438
439 if let Some(delegate_ext) = state.extension::<WebhookDelegateExt>() {
441 tracing::info!(subscription_id = %sub.id, "WebhookOutboundManager::dispatch: delegating webhook delivery via runtime hook");
442 if let Err(e) = (delegate_ext.0)(state, sub, log).await {
443 errors.push(e);
444 }
445 } else {
446 tracing::debug!(subscription_id = %sub.id, "WebhookOutboundManager::dispatch: enqueuing fallback webhook delivery job");
448 if let Some(job_client) = crate::job::global_job_client() {
449 let job_payload = serde_json::json!({
450 "log_id": log.id.clone(),
451 });
452 if let Err(e) = job_client
453 .enqueue("autumn_webhook_delivery", job_payload)
454 .await
455 {
456 errors.push(
457 self.record_delivery_enqueue_failure(log, e.to_string())
458 .await,
459 );
460 }
461 } else {
462 errors.push(
463 self.record_delivery_enqueue_failure(
464 log,
465 "Global job client is unavailable; fallback webhook delivery job not enqueued"
466 .to_owned(),
467 )
468 .await,
469 );
470 }
471 }
472 }
473
474 if !errors.is_empty() {
475 return Err(errors.remove(0));
476 }
477
478 Ok(())
479 }
480
481 async fn record_delivery_enqueue_failure(
482 &self,
483 mut log: WebhookDeliveryLog,
484 message: String,
485 ) -> AutumnError {
486 log.is_dlq = true;
487 log.last_error = Some(message.clone());
488 log.timestamp = Utc::now();
489
490 if let Err(e) = self.handler.replace_delivery_log(log).await {
491 tracing::error!(
492 error = %e,
493 "Failed to mark webhook delivery log as DLQ after enqueue failure"
494 );
495 return e;
496 }
497
498 AutumnError::internal_server_error_msg(message)
499 }
500}
501
502fn install_outbound_webhook_manager(
503 state: &AppState,
504 store: Arc<dyn OutboundWebhookHandler>,
505 initial_backoff_ms: u64,
506) {
507 let manager = WebhookOutboundManager::new(store)
508 .with_initial_backoff_ms(initial_backoff_ms)
509 .with_client_from_state(state);
510 state.insert_extension(manager);
511}
512
513#[must_use]
515#[allow(clippy::redundant_closure_for_method_calls, clippy::too_many_lines)]
516pub fn deliver_webhook_job(
517 state: AppState,
518 payload: serde_json::Value,
519) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send + 'static>> {
520 Box::pin(async move {
521 let is_replay = payload
522 .get("replay")
523 .and_then(serde_json::Value::as_bool)
524 .unwrap_or(false);
525 let manager = state.extension::<WebhookOutboundManager>().ok_or_else(|| {
526 AutumnError::internal_server_error_msg("WebhookOutboundManager not found in extensions")
527 })?;
528
529 let (sub, mut log) = if let Some(sub_val) = payload.get("subscription") {
531 let _payload_sub: WebhookSubscription = serde_json::from_value(sub_val.clone())
532 .map_err(|e| {
533 AutumnError::bad_request_msg(format!("failed to parse subscription: {e}"))
534 })?;
535 let mut log: WebhookDeliveryLog = serde_json::from_value(
536 payload
537 .get("log")
538 .cloned()
539 .ok_or_else(|| AutumnError::bad_request_msg("missing log in job payload"))?,
540 )
541 .map_err(|e| AutumnError::bad_request_msg(format!("failed to parse log: {e}")))?;
542
543 if log.response_status.is_some() || log.last_error.is_some() {
546 log.attempt = log.attempt.saturating_add(1);
547 log.response_status = None;
548 log.response_body = None;
549 log.last_error = None;
550 manager.store().log_delivery(log.clone()).await?;
551 }
552
553 let sub = load_current_subscription(&manager, &log).await?;
554 (sub, log)
555 } else {
556 let log_id = payload
557 .get("log_id")
558 .and_then(|v| v.as_str())
559 .ok_or_else(|| AutumnError::bad_request_msg("missing log_id in job payload"))?;
560
561 tracing::debug!(log_id = %log_id, "deliver_webhook_job: starting webhook delivery via log lookup");
562
563 let log_opt = manager.store().get_delivery_log(log_id).await?;
564 let mut log = log_opt.ok_or_else(|| {
565 AutumnError::not_found_msg(format!("delivery log {log_id} not found"))
566 })?;
567
568 if log.response_status.is_some() || log.last_error.is_some() {
571 log.attempt = log.attempt.saturating_add(1);
572 log.response_status = None;
573 log.response_body = None;
574 log.last_error = None;
575 manager.store().log_delivery(log.clone()).await?;
576 }
577
578 let sub = load_current_subscription(&manager, &log).await?;
580 (sub, log)
581 };
582
583 if sub.status == WebhookSubscriptionStatus::Disabled {
584 tracing::info!(subscription_id = %sub.id, "Webhook subscription is disabled; skipping delivery");
585 log.last_error = Some("Subscription is disabled".to_owned());
586 log.timestamp = Utc::now();
587 if is_replay {
588 log.is_dlq = true;
589 }
590 manager.store().log_delivery(log).await?;
591 return Ok(());
592 }
593
594 if sub.status == WebhookSubscriptionStatus::Failed && !is_replay {
595 tracing::info!(subscription_id = %sub.id, "Webhook subscription has failed; skipping delivery");
596 log.last_error = Some("Subscription has failed due to consecutive errors".to_owned());
597 log.timestamp = Utc::now();
598 manager.store().log_delivery(log).await?;
599 return Ok(());
600 }
601 if sub.status == WebhookSubscriptionStatus::Failed {
602 tracing::info!(subscription_id = %sub.id, "Replaying webhook delivery for failed subscription");
603 }
604
605 let timestamp = Utc::now().timestamp();
607 let signing_payload = format!("{timestamp}.{}", log.payload);
608 let signature = crate::security::config::hmac_sha256_hex(
609 sub.secret.as_bytes(),
610 signing_payload.as_bytes(),
611 );
612 let signature_header = format!("t={timestamp},v1={signature}");
613
614 let mut request_headers = HashMap::new();
615 request_headers.insert("Content-Type".to_owned(), "application/json".to_owned());
616 request_headers.insert("Autumn-Signature".to_owned(), signature_header.clone());
617
618 let start = std::time::Instant::now();
619 let req = manager
620 .client
621 .named(&sub.target_url)
622 .post(&sub.target_url)
623 .header("Content-Type", "application/json")
624 .header("Autumn-Signature", signature_header)
625 .text_body(log.payload.clone());
626
627 let response = req.send().await;
628 let elapsed = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
629
630 tracing::debug!(
631 log_id = %log.id,
632 status = ?response.as_ref().map(|r| r.status()),
633 "deliver_webhook_job: webhook HTTP request finished"
634 );
635
636 log.elapsed_ms = elapsed;
637 log.timestamp = Utc::now();
638 log.request_headers = request_headers;
639
640 match response {
641 Ok(res) => {
642 let status = res.status();
643 log.response_status = Some(status.as_u16());
644 let is_success = res.is_success();
645 let body_str = cap_logged_response_body(res.text());
646 log.response_body = Some(body_str);
647
648 if is_success {
649 log.last_error = None;
650 manager.store().log_delivery(log).await?;
651 reset_subscription_after_success(&manager, &sub).await;
652 Ok(())
653 } else {
654 let status_err = format!("server returned status: {status}");
655 log.last_error = Some(status_err.clone());
656 if log.attempt < log.max_attempts {
657 manager.store().log_delivery(log.clone()).await?;
658 }
659 handle_delivery_failure(&manager, &sub, log, status_err).await
660 }
661 }
662 Err(e) => {
663 let error_str = e.to_string();
664 log.last_error = Some(error_str.clone());
665 if log.attempt < log.max_attempts {
666 manager.store().log_delivery(log.clone()).await?;
667 }
668 handle_delivery_failure(&manager, &sub, log, error_str).await
669 }
670 }
671 })
672}
673
674async fn load_current_subscription(
675 manager: &WebhookOutboundManager,
676 log: &WebhookDeliveryLog,
677) -> AutumnResult<WebhookSubscription> {
678 manager
679 .store()
680 .get_subscription(&log.subscription_id)
681 .await?
682 .ok_or_else(|| {
683 AutumnError::not_found_msg(format!("subscription {} not found", log.subscription_id))
684 })
685}
686
687fn cap_logged_response_body(mut body: String) -> String {
688 if body.len() <= MAX_LOGGED_RESPONSE_BODY_BYTES {
689 return body;
690 }
691
692 let body_budget =
693 MAX_LOGGED_RESPONSE_BODY_BYTES.saturating_sub(TRUNCATED_RESPONSE_BODY_SUFFIX.len());
694 let mut cutoff = body_budget.min(body.len());
695 while cutoff > 0 && !body.is_char_boundary(cutoff) {
696 cutoff -= 1;
697 }
698 body.truncate(cutoff);
699 body.push_str(TRUNCATED_RESPONSE_BODY_SUFFIX);
700 body
701}
702
703async fn reset_subscription_after_success(
704 manager: &WebhookOutboundManager,
705 sub: &WebhookSubscription,
706) {
707 if let Err(e) = manager
708 .store()
709 .reactivate_failed_subscription(&sub.id)
710 .await
711 {
712 tracing::warn!(
713 subscription_id = %sub.id,
714 "Webhook delivery succeeded but subscription failure state could not be reset: {}",
715 e
716 );
717 }
718}
719
720async fn handle_delivery_failure(
721 manager: &WebhookOutboundManager,
722 sub: &WebhookSubscription,
723 mut log: WebhookDeliveryLog,
724 error_msg: String,
725) -> AutumnResult<()> {
726 if log.attempt < log.max_attempts {
727 Err(AutumnError::internal_server_error_msg(format!(
729 "delivery attempt {} failed, scheduled retry: {error_msg}",
730 log.attempt
731 )))
732 } else {
733 log.is_dlq = true;
734 manager.store().log_delivery(log).await?;
735 tracing::warn!(subscription_id = %sub.id, "Webhook delivery failed permanently; sent to DLQ: {}", error_msg);
737 Ok(())
738 }
739}
740
741pub struct OutboundWebhookPlugin {
743 store: Arc<dyn OutboundWebhookHandler>,
744 initial_backoff_ms: u64,
745}
746
747impl OutboundWebhookPlugin {
748 #[must_use]
750 pub fn new(store: Arc<dyn OutboundWebhookHandler>) -> Self {
751 Self {
752 store,
753 initial_backoff_ms: 1000,
754 }
755 }
756
757 #[must_use]
759 pub const fn with_initial_backoff_ms(mut self, ms: u64) -> Self {
760 self.initial_backoff_ms = ms;
761 self
762 }
763}
764
765impl crate::plugin::Plugin for OutboundWebhookPlugin {
766 fn build(self, app: crate::app::AppBuilder) -> crate::app::AppBuilder {
767 let store = self.store;
768 let initial_backoff_ms = self.initial_backoff_ms;
769
770 app.state_initializer(move |state| {
771 install_outbound_webhook_manager(state, store.clone(), initial_backoff_ms);
772 })
773 .jobs(vec![crate::job::JobInfo {
774 name: "autumn_webhook_delivery".to_string(),
775 max_attempts: 10, initial_backoff_ms,
777 uniqueness: None,
778 concurrency: None,
779 handler: deliver_webhook_job,
780 }])
781 }
782}
783
784#[cfg(test)]
785mod tests {
786 use super::*;
787 use crate::http_client::{HttpMockRegistryExt, MockRegistry, MockSetupBuilder};
788 use std::sync::Arc;
789 use std::sync::atomic::{AtomicUsize, Ordering};
790
791 fn mock_builder(registry: Arc<MockRegistry>, alias: &str) -> MockSetupBuilder {
792 MockSetupBuilder {
793 registry,
794 alias: alias.to_owned(),
795 method: None,
796 path: None,
797 }
798 }
799
800 fn sample_subscription(
801 id: &str,
802 target_url: &str,
803 status: WebhookSubscriptionStatus,
804 ) -> WebhookSubscription {
805 WebhookSubscription {
806 id: id.to_owned(),
807 target_url: target_url.to_owned(),
808 event_topics: vec!["orders.created".to_owned()],
809 secret: "my_webhook_signing_secret_32_bytes!!".to_owned(),
810 status,
811 consecutive_failures: if status == WebhookSubscriptionStatus::Failed {
812 50
813 } else {
814 0
815 },
816 }
817 }
818
819 fn sample_log(id: &str, subscription_id: &str) -> WebhookDeliveryLog {
820 WebhookDeliveryLog {
821 id: id.to_owned(),
822 subscription_id: subscription_id.to_owned(),
823 topic: "orders.created".to_owned(),
824 payload: serde_json::json!({ "order_id": "ord_123" }).to_string(),
825 request_headers: HashMap::new(),
826 response_status: None,
827 response_body: None,
828 elapsed_ms: 0,
829 attempt: 1,
830 max_attempts: 5,
831 is_dlq: false,
832 last_error: None,
833 timestamp: Utc::now(),
834 }
835 }
836
837 #[test]
838 fn outbound_webhook_plugin_installs_manager_without_startup_hook() {
839 let store = Arc::new(InMemoryOutboundWebhookHandler::new());
840 let builder = crate::app().plugin(OutboundWebhookPlugin::new(store));
841
842 assert!(
843 builder.startup_hooks.is_empty(),
844 "webhook manager must be installed before job workers start, not from a startup hook"
845 );
846 assert_eq!(builder.state_initializers.len(), 1);
847 }
848
849 #[tokio::test]
850 async fn replay_job_sends_failed_subscription_instead_of_skipping() {
851 let state = AppState::for_test();
852 let store = Arc::new(InMemoryOutboundWebhookHandler::new());
853 let registry = Arc::new(MockRegistry::new());
854 let mock = mock_builder(registry.clone(), "http://mock-receiver/webhooks/replay")
855 .post("/webhooks/replay")
856 .respond_with(200, serde_json::json!({ "received": true }));
857 state.insert_extension(HttpMockRegistryExt(registry));
858 install_outbound_webhook_manager(&state, store.clone(), 1);
859
860 let sub = sample_subscription(
861 "sub_failed",
862 "http://mock-receiver/webhooks/replay",
863 WebhookSubscriptionStatus::Failed,
864 );
865 store.create_subscription(sub).await.unwrap();
866 store
867 .replace_delivery_log(sample_log("log_replay", "sub_failed"))
868 .await
869 .unwrap();
870
871 deliver_webhook_job(
872 state,
873 serde_json::json!({
874 "log_id": "log_replay",
875 "replay": true,
876 }),
877 )
878 .await
879 .unwrap();
880
881 mock.expect_called(1);
882 let log = store
883 .get_delivery_log("log_replay")
884 .await
885 .unwrap()
886 .expect("log should remain stored");
887 assert_eq!(log.response_status, Some(200));
888 assert!(!log.is_dlq);
889 assert!(log.last_error.is_none());
890
891 let updated_sub = store
892 .get_subscription("sub_failed")
893 .await
894 .unwrap()
895 .expect("subscription should remain stored");
896 assert_eq!(updated_sub.status, WebhookSubscriptionStatus::Active);
897 assert_eq!(updated_sub.consecutive_failures, 0);
898 }
899
900 #[tokio::test]
901 async fn replay_job_keeps_disabled_subscription_log_in_dlq() {
902 let state = AppState::for_test();
903 let store = Arc::new(InMemoryOutboundWebhookHandler::new());
904 let registry = Arc::new(MockRegistry::new());
905 let mock = mock_builder(registry.clone(), "http://mock-receiver/webhooks/disabled")
906 .post("/webhooks/disabled")
907 .respond_with(200, serde_json::json!({ "received": true }));
908 state.insert_extension(HttpMockRegistryExt(registry));
909 install_outbound_webhook_manager(&state, store.clone(), 1);
910
911 let sub = sample_subscription(
912 "sub_disabled",
913 "http://mock-receiver/webhooks/disabled",
914 WebhookSubscriptionStatus::Disabled,
915 );
916 store.create_subscription(sub).await.unwrap();
917 store
918 .replace_delivery_log(sample_log("log_disabled_replay", "sub_disabled"))
919 .await
920 .unwrap();
921
922 deliver_webhook_job(
923 state,
924 serde_json::json!({
925 "log_id": "log_disabled_replay",
926 "replay": true,
927 }),
928 )
929 .await
930 .unwrap();
931
932 mock.expect_called(0);
933 let log = store
934 .get_delivery_log("log_disabled_replay")
935 .await
936 .unwrap()
937 .expect("log should remain stored");
938 assert!(log.is_dlq, "disabled replay must remain visible in DLQ");
939 assert_eq!(log.last_error.as_deref(), Some("Subscription is disabled"));
940 assert_eq!(log.response_status, None);
941 }
942
943 #[tokio::test]
944 async fn self_contained_delivery_uses_latest_subscription_state() {
945 let state = AppState::for_test();
946 let store = Arc::new(InMemoryOutboundWebhookHandler::new());
947 let registry = Arc::new(MockRegistry::new());
948 let stale_mock = mock_builder(registry.clone(), "http://mock-receiver/webhooks/stale")
949 .post("/webhooks/stale")
950 .respond_with(200, serde_json::json!({ "received": true }));
951 state.insert_extension(HttpMockRegistryExt(registry));
952 install_outbound_webhook_manager(&state, store.clone(), 1);
953
954 let stored_sub = sample_subscription(
955 "sub_refresh",
956 "http://mock-receiver/webhooks/current-disabled",
957 WebhookSubscriptionStatus::Disabled,
958 );
959 store.create_subscription(stored_sub).await.unwrap();
960 let stale_sub = sample_subscription(
961 "sub_refresh",
962 "http://mock-receiver/webhooks/stale",
963 WebhookSubscriptionStatus::Active,
964 );
965 let log = sample_log("log_refresh", "sub_refresh");
966
967 deliver_webhook_job(
968 state,
969 serde_json::json!({
970 "subscription": stale_sub,
971 "log": log,
972 }),
973 )
974 .await
975 .unwrap();
976
977 stale_mock.expect_called(0);
978 let stored = store
979 .get_delivery_log("log_refresh")
980 .await
981 .unwrap()
982 .expect("delivery log should exist");
983 assert_eq!(stored.response_status, None);
984 assert_eq!(
985 stored.last_error.as_deref(),
986 Some("Subscription is disabled")
987 );
988 }
989
990 #[tokio::test]
991 async fn dispatch_marks_log_dlq_when_fallback_enqueue_fails() {
992 let _guard = crate::job::global_job_runtime_test_lock().lock().await;
993 crate::job::clear_global_job_client();
994
995 let state = AppState::for_test();
996 let store = Arc::new(InMemoryOutboundWebhookHandler::new());
997 let manager = WebhookOutboundManager::new(store.clone()).with_initial_backoff_ms(1);
998 let sub = sample_subscription(
999 "sub_enqueue_missing",
1000 "http://mock-receiver/webhooks/enqueue-missing",
1001 WebhookSubscriptionStatus::Active,
1002 );
1003 store.create_subscription(sub).await.unwrap();
1004
1005 let err = manager
1006 .dispatch(&state, "orders.created", &serde_json::json!({ "id": 42 }))
1007 .await
1008 .expect_err("dispatch should report the missing fallback job runtime");
1009 assert!(
1010 err.to_string().contains("not enqueued"),
1011 "error should describe the enqueue failure: {err}"
1012 );
1013
1014 let logs = store.get_delivery_logs().await.unwrap();
1015 assert_eq!(logs.len(), 1);
1016 let log = &logs[0];
1017 assert!(
1018 log.is_dlq,
1019 "enqueue failure must leave a replayable DLQ record"
1020 );
1021 assert!(
1022 log.last_error
1023 .as_deref()
1024 .is_some_and(|msg| msg.contains("not enqueued")),
1025 "DLQ log should record enqueue failure: {:?}",
1026 log.last_error
1027 );
1028 assert_eq!(log.response_status, None);
1029
1030 let sub = store
1031 .get_subscription("sub_enqueue_missing")
1032 .await
1033 .unwrap()
1034 .expect("subscription should remain stored");
1035 assert_eq!(sub.consecutive_failures, 0);
1036 }
1037
1038 #[tokio::test]
1039 async fn delivery_log_response_body_is_capped() {
1040 let state = AppState::for_test();
1041 let store = Arc::new(InMemoryOutboundWebhookHandler::new());
1042 let registry = Arc::new(MockRegistry::new());
1043 let large_body = "x".repeat(MAX_LOGGED_RESPONSE_BODY_BYTES + 1024);
1044 let _mock = mock_builder(
1045 registry.clone(),
1046 "http://mock-receiver/webhooks/large-error",
1047 )
1048 .post("/webhooks/large-error")
1049 .respond_with(500, serde_json::json!({ "error": large_body }));
1050 state.insert_extension(HttpMockRegistryExt(registry));
1051 install_outbound_webhook_manager(&state, store.clone(), 1);
1052
1053 let sub = sample_subscription(
1054 "sub_large_error",
1055 "http://mock-receiver/webhooks/large-error",
1056 WebhookSubscriptionStatus::Active,
1057 );
1058 store.create_subscription(sub.clone()).await.unwrap();
1059 let mut log = sample_log("log_large_error", "sub_large_error");
1060 log.max_attempts = 1;
1061
1062 deliver_webhook_job(
1063 state,
1064 serde_json::json!({
1065 "subscription": sub,
1066 "log": log,
1067 }),
1068 )
1069 .await
1070 .unwrap();
1071
1072 let stored = store
1073 .get_delivery_log("log_large_error")
1074 .await
1075 .unwrap()
1076 .expect("delivery log should exist");
1077 let body = stored
1078 .response_body
1079 .expect("response body should be logged");
1080 assert!(
1081 body.len() <= MAX_LOGGED_RESPONSE_BODY_BYTES,
1082 "stored response body should be capped, got {} bytes",
1083 body.len()
1084 );
1085 assert!(body.ends_with("[truncated]"));
1086 }
1087
1088 struct CountingReplacementStore {
1089 log_delivery_calls: AtomicUsize,
1090 }
1091
1092 impl CountingReplacementStore {
1093 fn new() -> Self {
1094 Self {
1095 log_delivery_calls: AtomicUsize::new(0),
1096 }
1097 }
1098
1099 fn log_delivery_count(&self) -> usize {
1100 self.log_delivery_calls.load(Ordering::SeqCst)
1101 }
1102 }
1103
1104 impl OutboundWebhookHandler for CountingReplacementStore {
1105 fn get_subscriptions(
1106 &self,
1107 _topic: &str,
1108 ) -> Pin<Box<dyn Future<Output = AutumnResult<Vec<WebhookSubscription>>> + Send>> {
1109 Box::pin(async { Ok(Vec::new()) })
1110 }
1111
1112 fn log_delivery(
1113 &self,
1114 _log: WebhookDeliveryLog,
1115 ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
1116 self.log_delivery_calls.fetch_add(1, Ordering::SeqCst);
1117 Box::pin(async { Ok(()) })
1118 }
1119
1120 fn replace_delivery_log(
1121 &self,
1122 _log: WebhookDeliveryLog,
1123 ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
1124 Box::pin(async { Ok(()) })
1125 }
1126
1127 fn get_subscription(
1128 &self,
1129 _id: &str,
1130 ) -> Pin<Box<dyn Future<Output = AutumnResult<Option<WebhookSubscription>>> + Send>>
1131 {
1132 Box::pin(async { Ok(None) })
1133 }
1134
1135 fn get_delivery_log(
1136 &self,
1137 _id: &str,
1138 ) -> Pin<Box<dyn Future<Output = AutumnResult<Option<WebhookDeliveryLog>>> + Send>>
1139 {
1140 Box::pin(async { Ok(None) })
1141 }
1142 }
1143
1144 #[tokio::test]
1145 async fn replace_delivery_log_is_not_a_delivery_outcome() {
1146 let store = CountingReplacementStore::new();
1147 let mut log = sample_log("log_replace", "sub_replace");
1148 log.response_status = Some(500);
1149 log.last_error = Some("server returned status: 500 Internal Server Error".to_owned());
1150 log.is_dlq = true;
1151
1152 store.replace_delivery_log(log).await.unwrap();
1153
1154 assert_eq!(
1155 store.log_delivery_count(),
1156 0,
1157 "plain delivery-log replacement must not call log_delivery"
1158 );
1159 }
1160
1161 struct ResetFailingStore {
1162 inner: InMemoryOutboundWebhookHandler,
1163 }
1164
1165 impl ResetFailingStore {
1166 fn new() -> Self {
1167 Self {
1168 inner: InMemoryOutboundWebhookHandler::new(),
1169 }
1170 }
1171
1172 async fn create_subscription(&self, sub: WebhookSubscription) {
1173 self.inner.create_subscription(sub).await.unwrap();
1174 }
1175
1176 async fn delivery_log(&self, id: &str) -> WebhookDeliveryLog {
1177 self.inner
1178 .get_delivery_log(id)
1179 .await
1180 .unwrap()
1181 .expect("delivery log should exist")
1182 }
1183 }
1184
1185 impl OutboundWebhookHandler for ResetFailingStore {
1186 fn get_subscriptions(
1187 &self,
1188 topic: &str,
1189 ) -> Pin<Box<dyn Future<Output = AutumnResult<Vec<WebhookSubscription>>> + Send>> {
1190 <InMemoryOutboundWebhookHandler as OutboundWebhookHandler>::get_subscriptions(
1191 &self.inner,
1192 topic,
1193 )
1194 }
1195
1196 fn log_delivery(
1197 &self,
1198 log: WebhookDeliveryLog,
1199 ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
1200 <InMemoryOutboundWebhookHandler as OutboundWebhookHandler>::log_delivery(
1201 &self.inner,
1202 log,
1203 )
1204 }
1205
1206 fn replace_delivery_log(
1207 &self,
1208 log: WebhookDeliveryLog,
1209 ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
1210 <InMemoryOutboundWebhookHandler as OutboundWebhookHandler>::replace_delivery_log(
1211 &self.inner,
1212 log,
1213 )
1214 }
1215
1216 fn get_subscription(
1217 &self,
1218 id: &str,
1219 ) -> Pin<Box<dyn Future<Output = AutumnResult<Option<WebhookSubscription>>> + Send>>
1220 {
1221 <InMemoryOutboundWebhookHandler as OutboundWebhookHandler>::get_subscription(
1222 &self.inner,
1223 id,
1224 )
1225 }
1226
1227 fn get_delivery_log(
1228 &self,
1229 id: &str,
1230 ) -> Pin<Box<dyn Future<Output = AutumnResult<Option<WebhookDeliveryLog>>> + Send>>
1231 {
1232 <InMemoryOutboundWebhookHandler as OutboundWebhookHandler>::get_delivery_log(
1233 &self.inner,
1234 id,
1235 )
1236 }
1237
1238 fn reset_subscription_failures(
1239 &self,
1240 _id: &str,
1241 ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
1242 Box::pin(async {
1243 Err(AutumnError::internal_server_error_msg(
1244 "reset backend unavailable",
1245 ))
1246 })
1247 }
1248 }
1249
1250 #[tokio::test]
1251 async fn successful_delivery_does_not_retry_when_failure_reset_fails() {
1252 let state = AppState::for_test();
1253 let store = Arc::new(ResetFailingStore::new());
1254 let registry = Arc::new(MockRegistry::new());
1255 let mock = mock_builder(registry.clone(), "http://mock-receiver/webhooks/success")
1256 .post("/webhooks/success")
1257 .respond_with(200, serde_json::json!({ "received": true }));
1258 state.insert_extension(HttpMockRegistryExt(registry));
1259 install_outbound_webhook_manager(&state, store.clone(), 1);
1260
1261 let sub = sample_subscription(
1262 "sub_success",
1263 "http://mock-receiver/webhooks/success",
1264 WebhookSubscriptionStatus::Active,
1265 );
1266 store.create_subscription(sub.clone()).await;
1267 let log = sample_log("log_success", "sub_success");
1268
1269 deliver_webhook_job(
1270 state,
1271 serde_json::json!({
1272 "subscription": sub,
1273 "log": log,
1274 }),
1275 )
1276 .await
1277 .expect("accepted webhook delivery must not be retried because counter reset failed");
1278
1279 mock.expect_called(1);
1280 let persisted = store.delivery_log("log_success").await;
1281 assert_eq!(persisted.response_status, Some(200));
1282 assert!(persisted.last_error.is_none());
1283 }
1284
1285 #[tokio::test]
1286 async fn webhook_manager_uses_http_client_config_base_urls() {
1287 let _guard = crate::job::global_job_runtime_test_lock().lock().await;
1288 crate::job::clear_global_job_client();
1289
1290 let store = Arc::new(InMemoryOutboundWebhookHandler::new());
1291 let plugin = OutboundWebhookPlugin::new(store.clone()).with_initial_backoff_ms(1);
1292 let mut config = crate::config::AutumnConfig::default();
1293 config.http.client.base_urls.insert(
1294 "hook-service".to_owned(),
1295 "http://mock-receiver/base".to_owned(),
1296 );
1297
1298 let mut app_builder = crate::test::TestApp::new().config(config).plugin(plugin);
1299 let mock = app_builder
1300 .http_mock("hook-service")
1301 .post("/base/hook-service")
1302 .respond_with(200, serde_json::json!({ "received": true }));
1303 let app = app_builder.build();
1304 let state = app.state();
1305
1306 let sub = sample_subscription(
1307 "sub_config",
1308 "hook-service",
1309 WebhookSubscriptionStatus::Active,
1310 );
1311 store.create_subscription(sub.clone()).await.unwrap();
1312 let log = sample_log("log_config", "sub_config");
1313
1314 deliver_webhook_job(
1315 state.clone(),
1316 serde_json::json!({
1317 "subscription": sub,
1318 "log": log,
1319 }),
1320 )
1321 .await
1322 .unwrap();
1323
1324 mock.expect_called(1);
1325 crate::job::clear_global_job_client();
1326 }
1327}