Skip to main content

autumn_web/
webhook_outbound.rs

1#![allow(
2    clippy::significant_drop_tightening,
3    clippy::missing_panics_doc,
4    clippy::missing_errors_doc
5)]
6//! Outbound signed webhook delivery with retries, DLQ, and subscription management.
7
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::future::Future;
12use std::pin::Pin;
13use std::sync::{Arc, RwLock};
14
15use crate::http_client::Client;
16use crate::{AppState, AutumnError, AutumnResult};
17
18const MAX_LOGGED_RESPONSE_BODY_BYTES: usize = 16 * 1024;
19const TRUNCATED_RESPONSE_BODY_SUFFIX: &str = "\n[truncated]";
20
21/// The status of a webhook subscription.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
23#[serde(rename_all = "lowercase")]
24pub enum WebhookSubscriptionStatus {
25    Active,
26    Disabled,
27    Failed,
28}
29
30impl WebhookSubscriptionStatus {
31    /// Return the lower-case status label.
32    #[must_use]
33    pub const fn as_str(self) -> &'static str {
34        match self {
35            Self::Active => "active",
36            Self::Disabled => "disabled",
37            Self::Failed => "failed",
38        }
39    }
40}
41
42impl std::fmt::Display for WebhookSubscriptionStatus {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        f.write_str(self.as_str())
45    }
46}
47
48/// A registered webhook subscription targeting a consumer endpoint.
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct WebhookSubscription {
51    pub id: String,
52    pub target_url: String,
53    pub event_topics: Vec<String>,
54    pub secret: String,
55    pub status: WebhookSubscriptionStatus,
56    pub consecutive_failures: u32,
57}
58
59/// A structured log of an outbound webhook delivery attempt.
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct WebhookDeliveryLog {
62    pub id: String,
63    pub subscription_id: String,
64    pub topic: String,
65    pub payload: String,
66    pub request_headers: HashMap<String, String>,
67    pub response_status: Option<u16>,
68    pub response_body: Option<String>,
69    pub elapsed_ms: u64,
70    pub attempt: u32,
71    pub max_attempts: u32,
72    pub is_dlq: bool,
73    pub last_error: Option<String>,
74    pub timestamp: DateTime<Utc>,
75}
76
77/// Pluggable handler interface for outbound webhook subscriptions and delivery logs.
78pub trait OutboundWebhookHandler: Send + Sync + 'static {
79    /// Retrieve active subscriptions registered for a specific event topic.
80    fn get_subscriptions(
81        &self,
82        topic: &str,
83    ) -> Pin<Box<dyn Future<Output = AutumnResult<Vec<WebhookSubscription>>> + Send>>;
84
85    /// Log a webhook delivery attempt and handle failure counters/statuses.
86    fn log_delivery(
87        &self,
88        log: WebhookDeliveryLog,
89    ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>>;
90
91    /// Replace a stored delivery log without treating it as a new delivery outcome.
92    ///
93    /// Implementations must perform a plain record replacement. This must not
94    /// update subscription failure counters or auto-failure state.
95    fn replace_delivery_log(
96        &self,
97        log: WebhookDeliveryLog,
98    ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>>;
99
100    /// Retrieve a specific webhook subscription by ID (regardless of status/active state).
101    fn get_subscription(
102        &self,
103        id: &str,
104    ) -> Pin<Box<dyn Future<Output = AutumnResult<Option<WebhookSubscription>>> + Send>>;
105
106    /// Optional: List only permanently failed delivery attempts archived in the Dead Letter Queue.
107    fn get_dlq_logs(
108        &self,
109    ) -> Pin<Box<dyn Future<Output = AutumnResult<Vec<WebhookDeliveryLog>>> + Send>> {
110        Box::pin(async { Ok(Vec::new()) })
111    }
112
113    /// Get a specific delivery log by ID.
114    fn get_delivery_log(
115        &self,
116        id: &str,
117    ) -> Pin<Box<dyn Future<Output = AutumnResult<Option<WebhookDeliveryLog>>> + Send>>;
118
119    /// Optional: Reset consecutive failures for a subscription.
120    fn reset_subscription_failures(
121        &self,
122        _id: &str,
123    ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
124        Box::pin(async { Ok(()) })
125    }
126
127    /// Optional: Reactivate a subscription that was auto-marked as failed.
128    ///
129    /// Manual DLQ replays need to bypass the automatic failure guard without
130    /// re-enabling subscriptions that an operator explicitly disabled.
131    fn reactivate_failed_subscription(
132        &self,
133        id: &str,
134    ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
135        self.reset_subscription_failures(id)
136    }
137}
138
139/// Legacy alias for backward compatibility.
140pub use OutboundWebhookHandler as OutboundWebhookStore;
141
142/// Bounded, thread-safe, process-local in-memory implementation of the outbound webhook handler.
143#[derive(Debug, Default)]
144pub struct InMemoryOutboundWebhookHandler {
145    subscriptions: RwLock<HashMap<String, WebhookSubscription>>,
146    logs: RwLock<HashMap<String, WebhookDeliveryLog>>,
147}
148
149/// Legacy alias for backward compatibility.
150pub type InMemoryOutboundWebhookStore = InMemoryOutboundWebhookHandler;
151
152impl InMemoryOutboundWebhookHandler {
153    /// Create a new, empty in-memory handler.
154    #[must_use]
155    pub fn new() -> Self {
156        Self::default()
157    }
158
159    /// Helper to register a subscription in memory for testing/dev.
160    #[allow(clippy::unused_async)]
161    pub async fn create_subscription(
162        &self,
163        sub: WebhookSubscription,
164    ) -> AutumnResult<WebhookSubscription> {
165        let mut subs = self
166            .subscriptions
167            .write()
168            .expect("subscriptions write lock poisoned");
169        subs.insert(sub.id.clone(), sub.clone());
170        Ok(sub)
171    }
172
173    /// Helper to retrieve logged deliveries for testing/dev.
174    #[allow(clippy::unused_async)]
175    pub async fn get_delivery_logs(&self) -> AutumnResult<Vec<WebhookDeliveryLog>> {
176        let logs = self.logs.read().expect("logs read lock poisoned");
177        let mut list: Vec<WebhookDeliveryLog> = logs.values().cloned().collect();
178        list.sort_by_key(|l| l.timestamp);
179        list.reverse();
180        Ok(list)
181    }
182
183    /// Helper to fetch a single subscription.
184    #[allow(clippy::unused_async)]
185    pub async fn get_subscription(&self, id: &str) -> AutumnResult<Option<WebhookSubscription>> {
186        let subs = self
187            .subscriptions
188            .read()
189            .expect("subscriptions read lock poisoned");
190        Ok(subs.get(id).cloned())
191    }
192}
193
194impl OutboundWebhookHandler for InMemoryOutboundWebhookHandler {
195    fn get_subscriptions(
196        &self,
197        topic: &str,
198    ) -> Pin<Box<dyn Future<Output = AutumnResult<Vec<WebhookSubscription>>> + Send>> {
199        let subs = self
200            .subscriptions
201            .read()
202            .expect("subscriptions read lock poisoned");
203        let topic = topic.to_owned();
204        let list: Vec<WebhookSubscription> = subs
205            .values()
206            .filter(|sub| {
207                sub.event_topics.iter().any(|t| t == &topic)
208                    && sub.status == WebhookSubscriptionStatus::Active
209            })
210            .cloned()
211            .collect();
212        Box::pin(async move { Ok(list) })
213    }
214
215    fn get_subscription(
216        &self,
217        id: &str,
218    ) -> Pin<Box<dyn Future<Output = AutumnResult<Option<WebhookSubscription>>> + Send>> {
219        let subs = self
220            .subscriptions
221            .read()
222            .expect("subscriptions read lock poisoned");
223        let sub = subs.get(id).cloned();
224        Box::pin(async move { Ok(sub) })
225    }
226
227    fn log_delivery(
228        &self,
229        log: WebhookDeliveryLog,
230    ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
231        let mut logs = self.logs.write().expect("logs write lock poisoned");
232        logs.insert(log.id.clone(), log.clone());
233
234        // Manage subscription consecutive failures and auto-disabling state
235        let mut subs = self
236            .subscriptions
237            .write()
238            .expect("subscriptions write lock poisoned");
239        if let Some(sub) = subs.get_mut(&log.subscription_id) {
240            let is_active = sub.status == WebhookSubscriptionStatus::Active;
241            if is_active {
242                if let Some(status) = log.response_status {
243                    if (200..300).contains(&status) {
244                        sub.consecutive_failures = 0;
245                    } else {
246                        sub.consecutive_failures = sub.consecutive_failures.saturating_add(1);
247                        if sub.consecutive_failures >= 50 {
248                            sub.status = WebhookSubscriptionStatus::Failed;
249                            tracing::warn!(subscription_id = %sub.id, "Webhook subscription auto-disabled due to 50 consecutive failures");
250                        }
251                    }
252                } else if log.last_error.is_some() {
253                    sub.consecutive_failures = sub.consecutive_failures.saturating_add(1);
254                    if sub.consecutive_failures >= 50 {
255                        sub.status = WebhookSubscriptionStatus::Failed;
256                        tracing::warn!(subscription_id = %sub.id, "Webhook subscription auto-disabled due to 50 consecutive failures");
257                    }
258                }
259            }
260        }
261
262        Box::pin(async move { Ok(()) })
263    }
264
265    fn replace_delivery_log(
266        &self,
267        log: WebhookDeliveryLog,
268    ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
269        let mut logs = self.logs.write().expect("logs write lock poisoned");
270        logs.insert(log.id.clone(), log);
271        Box::pin(async move { Ok(()) })
272    }
273
274    fn get_dlq_logs(
275        &self,
276    ) -> Pin<Box<dyn Future<Output = AutumnResult<Vec<WebhookDeliveryLog>>> + Send>> {
277        let list = {
278            let logs = self.logs.read().expect("logs read lock poisoned");
279            let mut list: Vec<WebhookDeliveryLog> =
280                logs.values().filter(|l| l.is_dlq).cloned().collect();
281            list.sort_by_key(|l| l.timestamp);
282            list.reverse();
283            list
284        };
285        Box::pin(async move { Ok(list) })
286    }
287
288    fn get_delivery_log(
289        &self,
290        id: &str,
291    ) -> Pin<Box<dyn Future<Output = AutumnResult<Option<WebhookDeliveryLog>>> + Send>> {
292        let log = self
293            .logs
294            .read()
295            .expect("logs read lock poisoned")
296            .get(id)
297            .cloned();
298        Box::pin(async move { Ok(log) })
299    }
300
301    fn reset_subscription_failures(
302        &self,
303        id: &str,
304    ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
305        {
306            let mut subs = self
307                .subscriptions
308                .write()
309                .expect("subscriptions write lock poisoned");
310            if let Some(sub) = subs.get_mut(id) {
311                sub.consecutive_failures = 0;
312            }
313        }
314        Box::pin(async move { Ok(()) })
315    }
316
317    fn reactivate_failed_subscription(
318        &self,
319        id: &str,
320    ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
321        {
322            let mut subs = self
323                .subscriptions
324                .write()
325                .expect("subscriptions write lock poisoned");
326            if let Some(sub) = subs.get_mut(id) {
327                sub.consecutive_failures = 0;
328                if sub.status == WebhookSubscriptionStatus::Failed {
329                    sub.status = WebhookSubscriptionStatus::Active;
330                }
331            }
332        }
333        Box::pin(async move { Ok(()) })
334    }
335}
336
337/// A runtime delegation callback type to bridge core autumn to autumn-harvest dynamically.
338pub type WebhookDelegate = Arc<
339    dyn Fn(
340            &AppState,
341            WebhookSubscription,
342            WebhookDeliveryLog,
343        ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>>
344        + Send
345        + Sync,
346>;
347
348/// `AppState` extension for the runtime delegation hook.
349#[derive(Clone)]
350pub struct WebhookDelegateExt(pub WebhookDelegate);
351
352/// The runtime manager for outbound webhooks.
353#[derive(Clone)]
354pub struct WebhookOutboundManager {
355    handler: Arc<dyn OutboundWebhookHandler>,
356    client: Client,
357    initial_backoff_ms: u64,
358}
359
360impl WebhookOutboundManager {
361    /// Create a new webhook manager with a handler.
362    pub fn new(handler: Arc<dyn OutboundWebhookHandler>) -> Self {
363        Self {
364            handler,
365            client: Client::new(),
366            initial_backoff_ms: 1000,
367        }
368    }
369
370    /// Set a custom initial backoff for retries.
371    #[must_use]
372    pub const fn with_initial_backoff_ms(mut self, ms: u64) -> Self {
373        self.initial_backoff_ms = ms;
374        self
375    }
376
377    fn with_client_from_state(mut self, state: &AppState) -> Self {
378        self.client = Client::from_state(state);
379        self
380    }
381
382    /// Access the underlying webhook handler (compatibility/actuator support).
383    #[must_use]
384    pub fn store(&self) -> &Arc<dyn OutboundWebhookHandler> {
385        &self.handler
386    }
387
388    /// Access the underlying http client.
389    #[must_use]
390    pub const fn client(&self) -> &Client {
391        &self.client
392    }
393
394    /// Dispatch a signed webhook payload to all subscriptions interested in `topic`.
395    ///
396    /// # Errors
397    ///
398    /// Returns [`AutumnError`] if payload serialization or queueing fails.
399    pub async fn dispatch<T: Serialize + Sync>(
400        &self,
401        state: &AppState,
402        topic: &str,
403        payload: &T,
404    ) -> AutumnResult<()> {
405        let serialized = serde_json::to_string(payload).map_err(|e| {
406            AutumnError::internal_server_error_msg(format!("failed to serialize payload: {e}"))
407        })?;
408
409        let mut errors = Vec::new();
410        let subs = self.handler.get_subscriptions(topic).await?;
411        for sub in subs {
412            if sub.status == WebhookSubscriptionStatus::Disabled {
413                continue;
414            }
415
416            let log_id = uuid::Uuid::new_v4().to_string();
417            let log = WebhookDeliveryLog {
418                id: log_id.clone(),
419                subscription_id: sub.id.clone(),
420                topic: topic.to_owned(),
421                payload: serialized.clone(),
422                request_headers: HashMap::new(),
423                response_status: None,
424                response_body: None,
425                elapsed_ms: 0,
426                attempt: 1,
427                max_attempts: 5,
428                is_dlq: false,
429                last_error: None,
430                timestamp: Utc::now(),
431            };
432
433            // Register the initial attempt in local storage
434            if let Err(e) = self.handler.log_delivery(log.clone()).await {
435                errors.push(e);
436                continue;
437            }
438
439            // If a delegate extension is registered, run it (delegates to Harvest workflow)
440            if let Some(delegate_ext) = state.extension::<WebhookDelegateExt>() {
441                tracing::info!(subscription_id = %sub.id, "WebhookOutboundManager::dispatch: delegating webhook delivery via runtime hook");
442                if let Err(e) = (delegate_ext.0)(state, sub, log).await {
443                    errors.push(e);
444                }
445            } else {
446                // Fallback: enqueue a standard background job
447                tracing::debug!(subscription_id = %sub.id, "WebhookOutboundManager::dispatch: enqueuing fallback webhook delivery job");
448                if let Some(job_client) = crate::job::global_job_client() {
449                    let job_payload = serde_json::json!({
450                        "log_id": log.id.clone(),
451                    });
452                    if let Err(e) = job_client
453                        .enqueue("autumn_webhook_delivery", job_payload)
454                        .await
455                    {
456                        errors.push(
457                            self.record_delivery_enqueue_failure(log, e.to_string())
458                                .await,
459                        );
460                    }
461                } else {
462                    errors.push(
463                        self.record_delivery_enqueue_failure(
464                            log,
465                            "Global job client is unavailable; fallback webhook delivery job not enqueued"
466                                .to_owned(),
467                        )
468                        .await,
469                    );
470                }
471            }
472        }
473
474        if !errors.is_empty() {
475            return Err(errors.remove(0));
476        }
477
478        Ok(())
479    }
480
481    async fn record_delivery_enqueue_failure(
482        &self,
483        mut log: WebhookDeliveryLog,
484        message: String,
485    ) -> AutumnError {
486        log.is_dlq = true;
487        log.last_error = Some(message.clone());
488        log.timestamp = Utc::now();
489
490        if let Err(e) = self.handler.replace_delivery_log(log).await {
491            tracing::error!(
492                error = %e,
493                "Failed to mark webhook delivery log as DLQ after enqueue failure"
494            );
495            return e;
496        }
497
498        AutumnError::internal_server_error_msg(message)
499    }
500}
501
502fn install_outbound_webhook_manager(
503    state: &AppState,
504    store: Arc<dyn OutboundWebhookHandler>,
505    initial_backoff_ms: u64,
506) {
507    let manager = WebhookOutboundManager::new(store)
508        .with_initial_backoff_ms(initial_backoff_ms)
509        .with_client_from_state(state);
510    state.insert_extension(manager);
511}
512
513/// Asynchronous background job that delivers a webhook payload (legacy fallback).
514#[must_use]
515#[allow(clippy::redundant_closure_for_method_calls, clippy::too_many_lines)]
516pub fn deliver_webhook_job(
517    state: AppState,
518    payload: serde_json::Value,
519) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send + 'static>> {
520    Box::pin(async move {
521        let is_replay = payload
522            .get("replay")
523            .and_then(serde_json::Value::as_bool)
524            .unwrap_or(false);
525        let manager = state.extension::<WebhookOutboundManager>().ok_or_else(|| {
526            AutumnError::internal_server_error_msg("WebhookOutboundManager not found in extensions")
527        })?;
528
529        // Support both self-contained payload structure and legacy log_id lookup (for replays)
530        let (sub, mut log) = if let Some(sub_val) = payload.get("subscription") {
531            let _payload_sub: WebhookSubscription = serde_json::from_value(sub_val.clone())
532                .map_err(|e| {
533                    AutumnError::bad_request_msg(format!("failed to parse subscription: {e}"))
534                })?;
535            let mut log: WebhookDeliveryLog = serde_json::from_value(
536                payload
537                    .get("log")
538                    .cloned()
539                    .ok_or_else(|| AutumnError::bad_request_msg("missing log in job payload"))?,
540            )
541            .map_err(|e| AutumnError::bad_request_msg(format!("failed to parse log: {e}")))?;
542
543            // If this log has already been attempted (i.e. is running a retry from the job runner),
544            // increment the attempt counter and write the pre-send log.
545            if log.response_status.is_some() || log.last_error.is_some() {
546                log.attempt = log.attempt.saturating_add(1);
547                log.response_status = None;
548                log.response_body = None;
549                log.last_error = None;
550                manager.store().log_delivery(log.clone()).await?;
551            }
552
553            let sub = load_current_subscription(&manager, &log).await?;
554            (sub, log)
555        } else {
556            let log_id = payload
557                .get("log_id")
558                .and_then(|v| v.as_str())
559                .ok_or_else(|| AutumnError::bad_request_msg("missing log_id in job payload"))?;
560
561            tracing::debug!(log_id = %log_id, "deliver_webhook_job: starting webhook delivery via log lookup");
562
563            let log_opt = manager.store().get_delivery_log(log_id).await?;
564            let mut log = log_opt.ok_or_else(|| {
565                AutumnError::not_found_msg(format!("delivery log {log_id} not found"))
566            })?;
567
568            // If this log has already been attempted (i.e. is running a retry from the job runner),
569            // increment the attempt counter and write the pre-send log.
570            if log.response_status.is_some() || log.last_error.is_some() {
571                log.attempt = log.attempt.saturating_add(1);
572                log.response_status = None;
573                log.response_body = None;
574                log.last_error = None;
575                manager.store().log_delivery(log.clone()).await?;
576            }
577
578            // Load latest subscription state to respect emergency rotations/disable
579            let sub = load_current_subscription(&manager, &log).await?;
580            (sub, log)
581        };
582
583        if sub.status == WebhookSubscriptionStatus::Disabled {
584            tracing::info!(subscription_id = %sub.id, "Webhook subscription is disabled; skipping delivery");
585            log.last_error = Some("Subscription is disabled".to_owned());
586            log.timestamp = Utc::now();
587            if is_replay {
588                log.is_dlq = true;
589            }
590            manager.store().log_delivery(log).await?;
591            return Ok(());
592        }
593
594        if sub.status == WebhookSubscriptionStatus::Failed && !is_replay {
595            tracing::info!(subscription_id = %sub.id, "Webhook subscription has failed; skipping delivery");
596            log.last_error = Some("Subscription has failed due to consecutive errors".to_owned());
597            log.timestamp = Utc::now();
598            manager.store().log_delivery(log).await?;
599            return Ok(());
600        }
601        if sub.status == WebhookSubscriptionStatus::Failed {
602            tracing::info!(subscription_id = %sub.id, "Replaying webhook delivery for failed subscription");
603        }
604
605        // Stripe-style payload signing: t=<timestamp>,v1=<signature>
606        let timestamp = Utc::now().timestamp();
607        let signing_payload = format!("{timestamp}.{}", log.payload);
608        let signature = crate::security::config::hmac_sha256_hex(
609            sub.secret.as_bytes(),
610            signing_payload.as_bytes(),
611        );
612        let signature_header = format!("t={timestamp},v1={signature}");
613
614        let mut request_headers = HashMap::new();
615        request_headers.insert("Content-Type".to_owned(), "application/json".to_owned());
616        request_headers.insert("Autumn-Signature".to_owned(), signature_header.clone());
617
618        let start = std::time::Instant::now();
619        let req = manager
620            .client
621            .named(&sub.target_url)
622            .post(&sub.target_url)
623            .header("Content-Type", "application/json")
624            .header("Autumn-Signature", signature_header)
625            .text_body(log.payload.clone());
626
627        let response = req.send().await;
628        let elapsed = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
629
630        tracing::debug!(
631            log_id = %log.id,
632            status = ?response.as_ref().map(|r| r.status()),
633            "deliver_webhook_job: webhook HTTP request finished"
634        );
635
636        log.elapsed_ms = elapsed;
637        log.timestamp = Utc::now();
638        log.request_headers = request_headers;
639
640        match response {
641            Ok(res) => {
642                let status = res.status();
643                log.response_status = Some(status.as_u16());
644                let is_success = res.is_success();
645                let body_str = cap_logged_response_body(res.text());
646                log.response_body = Some(body_str);
647
648                if is_success {
649                    log.last_error = None;
650                    manager.store().log_delivery(log).await?;
651                    reset_subscription_after_success(&manager, &sub).await;
652                    Ok(())
653                } else {
654                    let status_err = format!("server returned status: {status}");
655                    log.last_error = Some(status_err.clone());
656                    if log.attempt < log.max_attempts {
657                        manager.store().log_delivery(log.clone()).await?;
658                    }
659                    handle_delivery_failure(&manager, &sub, log, status_err).await
660                }
661            }
662            Err(e) => {
663                let error_str = e.to_string();
664                log.last_error = Some(error_str.clone());
665                if log.attempt < log.max_attempts {
666                    manager.store().log_delivery(log.clone()).await?;
667                }
668                handle_delivery_failure(&manager, &sub, log, error_str).await
669            }
670        }
671    })
672}
673
674async fn load_current_subscription(
675    manager: &WebhookOutboundManager,
676    log: &WebhookDeliveryLog,
677) -> AutumnResult<WebhookSubscription> {
678    manager
679        .store()
680        .get_subscription(&log.subscription_id)
681        .await?
682        .ok_or_else(|| {
683            AutumnError::not_found_msg(format!("subscription {} not found", log.subscription_id))
684        })
685}
686
687fn cap_logged_response_body(mut body: String) -> String {
688    if body.len() <= MAX_LOGGED_RESPONSE_BODY_BYTES {
689        return body;
690    }
691
692    let body_budget =
693        MAX_LOGGED_RESPONSE_BODY_BYTES.saturating_sub(TRUNCATED_RESPONSE_BODY_SUFFIX.len());
694    let mut cutoff = body_budget.min(body.len());
695    while cutoff > 0 && !body.is_char_boundary(cutoff) {
696        cutoff -= 1;
697    }
698    body.truncate(cutoff);
699    body.push_str(TRUNCATED_RESPONSE_BODY_SUFFIX);
700    body
701}
702
703async fn reset_subscription_after_success(
704    manager: &WebhookOutboundManager,
705    sub: &WebhookSubscription,
706) {
707    if let Err(e) = manager
708        .store()
709        .reactivate_failed_subscription(&sub.id)
710        .await
711    {
712        tracing::warn!(
713            subscription_id = %sub.id,
714            "Webhook delivery succeeded but subscription failure state could not be reset: {}",
715            e
716        );
717    }
718}
719
720async fn handle_delivery_failure(
721    manager: &WebhookOutboundManager,
722    sub: &WebhookSubscription,
723    mut log: WebhookDeliveryLog,
724    error_msg: String,
725) -> AutumnResult<()> {
726    if log.attempt < log.max_attempts {
727        // Return an error to signal the background job runner to retry this job
728        Err(AutumnError::internal_server_error_msg(format!(
729            "delivery attempt {} failed, scheduled retry: {error_msg}",
730            log.attempt
731        )))
732    } else {
733        log.is_dlq = true;
734        manager.store().log_delivery(log).await?;
735        // Return Ok(()) to mark the permanently failed job as complete and send to DLQ
736        tracing::warn!(subscription_id = %sub.id, "Webhook delivery failed permanently; sent to DLQ: {}", error_msg);
737        Ok(())
738    }
739}
740
741/// `AppBuilder` plugin for outbound signed webhook delivery infrastructure.
742pub struct OutboundWebhookPlugin {
743    store: Arc<dyn OutboundWebhookHandler>,
744    initial_backoff_ms: u64,
745}
746
747impl OutboundWebhookPlugin {
748    /// Create a new outbound webhook plugin using the specified store.
749    #[must_use]
750    pub fn new(store: Arc<dyn OutboundWebhookHandler>) -> Self {
751        Self {
752            store,
753            initial_backoff_ms: 1000,
754        }
755    }
756
757    /// Override the initial backoff retry delay.
758    #[must_use]
759    pub const fn with_initial_backoff_ms(mut self, ms: u64) -> Self {
760        self.initial_backoff_ms = ms;
761        self
762    }
763}
764
765impl crate::plugin::Plugin for OutboundWebhookPlugin {
766    fn build(self, app: crate::app::AppBuilder) -> crate::app::AppBuilder {
767        let store = self.store;
768        let initial_backoff_ms = self.initial_backoff_ms;
769
770        app.state_initializer(move |state| {
771            install_outbound_webhook_manager(state, store.clone(), initial_backoff_ms);
772        })
773        .jobs(vec![crate::job::JobInfo {
774            name: "autumn_webhook_delivery".to_string(),
775            max_attempts: 10, // Retries are handled durably via the background job engine
776            initial_backoff_ms,
777            uniqueness: None,
778            concurrency: None,
779            handler: deliver_webhook_job,
780        }])
781    }
782}
783
784#[cfg(test)]
785mod tests {
786    use super::*;
787    use crate::http_client::{HttpMockRegistryExt, MockRegistry, MockSetupBuilder};
788    use std::sync::Arc;
789    use std::sync::atomic::{AtomicUsize, Ordering};
790
791    fn mock_builder(registry: Arc<MockRegistry>, alias: &str) -> MockSetupBuilder {
792        MockSetupBuilder {
793            registry,
794            alias: alias.to_owned(),
795            method: None,
796            path: None,
797        }
798    }
799
800    fn sample_subscription(
801        id: &str,
802        target_url: &str,
803        status: WebhookSubscriptionStatus,
804    ) -> WebhookSubscription {
805        WebhookSubscription {
806            id: id.to_owned(),
807            target_url: target_url.to_owned(),
808            event_topics: vec!["orders.created".to_owned()],
809            secret: "my_webhook_signing_secret_32_bytes!!".to_owned(),
810            status,
811            consecutive_failures: if status == WebhookSubscriptionStatus::Failed {
812                50
813            } else {
814                0
815            },
816        }
817    }
818
819    fn sample_log(id: &str, subscription_id: &str) -> WebhookDeliveryLog {
820        WebhookDeliveryLog {
821            id: id.to_owned(),
822            subscription_id: subscription_id.to_owned(),
823            topic: "orders.created".to_owned(),
824            payload: serde_json::json!({ "order_id": "ord_123" }).to_string(),
825            request_headers: HashMap::new(),
826            response_status: None,
827            response_body: None,
828            elapsed_ms: 0,
829            attempt: 1,
830            max_attempts: 5,
831            is_dlq: false,
832            last_error: None,
833            timestamp: Utc::now(),
834        }
835    }
836
837    #[test]
838    fn outbound_webhook_plugin_installs_manager_without_startup_hook() {
839        let store = Arc::new(InMemoryOutboundWebhookHandler::new());
840        let builder = crate::app().plugin(OutboundWebhookPlugin::new(store));
841
842        assert!(
843            builder.startup_hooks.is_empty(),
844            "webhook manager must be installed before job workers start, not from a startup hook"
845        );
846        assert_eq!(builder.state_initializers.len(), 1);
847    }
848
849    #[tokio::test]
850    async fn replay_job_sends_failed_subscription_instead_of_skipping() {
851        let state = AppState::for_test();
852        let store = Arc::new(InMemoryOutboundWebhookHandler::new());
853        let registry = Arc::new(MockRegistry::new());
854        let mock = mock_builder(registry.clone(), "http://mock-receiver/webhooks/replay")
855            .post("/webhooks/replay")
856            .respond_with(200, serde_json::json!({ "received": true }));
857        state.insert_extension(HttpMockRegistryExt(registry));
858        install_outbound_webhook_manager(&state, store.clone(), 1);
859
860        let sub = sample_subscription(
861            "sub_failed",
862            "http://mock-receiver/webhooks/replay",
863            WebhookSubscriptionStatus::Failed,
864        );
865        store.create_subscription(sub).await.unwrap();
866        store
867            .replace_delivery_log(sample_log("log_replay", "sub_failed"))
868            .await
869            .unwrap();
870
871        deliver_webhook_job(
872            state,
873            serde_json::json!({
874                "log_id": "log_replay",
875                "replay": true,
876            }),
877        )
878        .await
879        .unwrap();
880
881        mock.expect_called(1);
882        let log = store
883            .get_delivery_log("log_replay")
884            .await
885            .unwrap()
886            .expect("log should remain stored");
887        assert_eq!(log.response_status, Some(200));
888        assert!(!log.is_dlq);
889        assert!(log.last_error.is_none());
890
891        let updated_sub = store
892            .get_subscription("sub_failed")
893            .await
894            .unwrap()
895            .expect("subscription should remain stored");
896        assert_eq!(updated_sub.status, WebhookSubscriptionStatus::Active);
897        assert_eq!(updated_sub.consecutive_failures, 0);
898    }
899
900    #[tokio::test]
901    async fn replay_job_keeps_disabled_subscription_log_in_dlq() {
902        let state = AppState::for_test();
903        let store = Arc::new(InMemoryOutboundWebhookHandler::new());
904        let registry = Arc::new(MockRegistry::new());
905        let mock = mock_builder(registry.clone(), "http://mock-receiver/webhooks/disabled")
906            .post("/webhooks/disabled")
907            .respond_with(200, serde_json::json!({ "received": true }));
908        state.insert_extension(HttpMockRegistryExt(registry));
909        install_outbound_webhook_manager(&state, store.clone(), 1);
910
911        let sub = sample_subscription(
912            "sub_disabled",
913            "http://mock-receiver/webhooks/disabled",
914            WebhookSubscriptionStatus::Disabled,
915        );
916        store.create_subscription(sub).await.unwrap();
917        store
918            .replace_delivery_log(sample_log("log_disabled_replay", "sub_disabled"))
919            .await
920            .unwrap();
921
922        deliver_webhook_job(
923            state,
924            serde_json::json!({
925                "log_id": "log_disabled_replay",
926                "replay": true,
927            }),
928        )
929        .await
930        .unwrap();
931
932        mock.expect_called(0);
933        let log = store
934            .get_delivery_log("log_disabled_replay")
935            .await
936            .unwrap()
937            .expect("log should remain stored");
938        assert!(log.is_dlq, "disabled replay must remain visible in DLQ");
939        assert_eq!(log.last_error.as_deref(), Some("Subscription is disabled"));
940        assert_eq!(log.response_status, None);
941    }
942
943    #[tokio::test]
944    async fn self_contained_delivery_uses_latest_subscription_state() {
945        let state = AppState::for_test();
946        let store = Arc::new(InMemoryOutboundWebhookHandler::new());
947        let registry = Arc::new(MockRegistry::new());
948        let stale_mock = mock_builder(registry.clone(), "http://mock-receiver/webhooks/stale")
949            .post("/webhooks/stale")
950            .respond_with(200, serde_json::json!({ "received": true }));
951        state.insert_extension(HttpMockRegistryExt(registry));
952        install_outbound_webhook_manager(&state, store.clone(), 1);
953
954        let stored_sub = sample_subscription(
955            "sub_refresh",
956            "http://mock-receiver/webhooks/current-disabled",
957            WebhookSubscriptionStatus::Disabled,
958        );
959        store.create_subscription(stored_sub).await.unwrap();
960        let stale_sub = sample_subscription(
961            "sub_refresh",
962            "http://mock-receiver/webhooks/stale",
963            WebhookSubscriptionStatus::Active,
964        );
965        let log = sample_log("log_refresh", "sub_refresh");
966
967        deliver_webhook_job(
968            state,
969            serde_json::json!({
970                "subscription": stale_sub,
971                "log": log,
972            }),
973        )
974        .await
975        .unwrap();
976
977        stale_mock.expect_called(0);
978        let stored = store
979            .get_delivery_log("log_refresh")
980            .await
981            .unwrap()
982            .expect("delivery log should exist");
983        assert_eq!(stored.response_status, None);
984        assert_eq!(
985            stored.last_error.as_deref(),
986            Some("Subscription is disabled")
987        );
988    }
989
990    #[tokio::test]
991    async fn dispatch_marks_log_dlq_when_fallback_enqueue_fails() {
992        let _guard = crate::job::global_job_runtime_test_lock().lock().await;
993        crate::job::clear_global_job_client();
994
995        let state = AppState::for_test();
996        let store = Arc::new(InMemoryOutboundWebhookHandler::new());
997        let manager = WebhookOutboundManager::new(store.clone()).with_initial_backoff_ms(1);
998        let sub = sample_subscription(
999            "sub_enqueue_missing",
1000            "http://mock-receiver/webhooks/enqueue-missing",
1001            WebhookSubscriptionStatus::Active,
1002        );
1003        store.create_subscription(sub).await.unwrap();
1004
1005        let err = manager
1006            .dispatch(&state, "orders.created", &serde_json::json!({ "id": 42 }))
1007            .await
1008            .expect_err("dispatch should report the missing fallback job runtime");
1009        assert!(
1010            err.to_string().contains("not enqueued"),
1011            "error should describe the enqueue failure: {err}"
1012        );
1013
1014        let logs = store.get_delivery_logs().await.unwrap();
1015        assert_eq!(logs.len(), 1);
1016        let log = &logs[0];
1017        assert!(
1018            log.is_dlq,
1019            "enqueue failure must leave a replayable DLQ record"
1020        );
1021        assert!(
1022            log.last_error
1023                .as_deref()
1024                .is_some_and(|msg| msg.contains("not enqueued")),
1025            "DLQ log should record enqueue failure: {:?}",
1026            log.last_error
1027        );
1028        assert_eq!(log.response_status, None);
1029
1030        let sub = store
1031            .get_subscription("sub_enqueue_missing")
1032            .await
1033            .unwrap()
1034            .expect("subscription should remain stored");
1035        assert_eq!(sub.consecutive_failures, 0);
1036    }
1037
1038    #[tokio::test]
1039    async fn delivery_log_response_body_is_capped() {
1040        let state = AppState::for_test();
1041        let store = Arc::new(InMemoryOutboundWebhookHandler::new());
1042        let registry = Arc::new(MockRegistry::new());
1043        let large_body = "x".repeat(MAX_LOGGED_RESPONSE_BODY_BYTES + 1024);
1044        let _mock = mock_builder(
1045            registry.clone(),
1046            "http://mock-receiver/webhooks/large-error",
1047        )
1048        .post("/webhooks/large-error")
1049        .respond_with(500, serde_json::json!({ "error": large_body }));
1050        state.insert_extension(HttpMockRegistryExt(registry));
1051        install_outbound_webhook_manager(&state, store.clone(), 1);
1052
1053        let sub = sample_subscription(
1054            "sub_large_error",
1055            "http://mock-receiver/webhooks/large-error",
1056            WebhookSubscriptionStatus::Active,
1057        );
1058        store.create_subscription(sub.clone()).await.unwrap();
1059        let mut log = sample_log("log_large_error", "sub_large_error");
1060        log.max_attempts = 1;
1061
1062        deliver_webhook_job(
1063            state,
1064            serde_json::json!({
1065                "subscription": sub,
1066                "log": log,
1067            }),
1068        )
1069        .await
1070        .unwrap();
1071
1072        let stored = store
1073            .get_delivery_log("log_large_error")
1074            .await
1075            .unwrap()
1076            .expect("delivery log should exist");
1077        let body = stored
1078            .response_body
1079            .expect("response body should be logged");
1080        assert!(
1081            body.len() <= MAX_LOGGED_RESPONSE_BODY_BYTES,
1082            "stored response body should be capped, got {} bytes",
1083            body.len()
1084        );
1085        assert!(body.ends_with("[truncated]"));
1086    }
1087
1088    struct CountingReplacementStore {
1089        log_delivery_calls: AtomicUsize,
1090    }
1091
1092    impl CountingReplacementStore {
1093        fn new() -> Self {
1094            Self {
1095                log_delivery_calls: AtomicUsize::new(0),
1096            }
1097        }
1098
1099        fn log_delivery_count(&self) -> usize {
1100            self.log_delivery_calls.load(Ordering::SeqCst)
1101        }
1102    }
1103
1104    impl OutboundWebhookHandler for CountingReplacementStore {
1105        fn get_subscriptions(
1106            &self,
1107            _topic: &str,
1108        ) -> Pin<Box<dyn Future<Output = AutumnResult<Vec<WebhookSubscription>>> + Send>> {
1109            Box::pin(async { Ok(Vec::new()) })
1110        }
1111
1112        fn log_delivery(
1113            &self,
1114            _log: WebhookDeliveryLog,
1115        ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
1116            self.log_delivery_calls.fetch_add(1, Ordering::SeqCst);
1117            Box::pin(async { Ok(()) })
1118        }
1119
1120        fn replace_delivery_log(
1121            &self,
1122            _log: WebhookDeliveryLog,
1123        ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
1124            Box::pin(async { Ok(()) })
1125        }
1126
1127        fn get_subscription(
1128            &self,
1129            _id: &str,
1130        ) -> Pin<Box<dyn Future<Output = AutumnResult<Option<WebhookSubscription>>> + Send>>
1131        {
1132            Box::pin(async { Ok(None) })
1133        }
1134
1135        fn get_delivery_log(
1136            &self,
1137            _id: &str,
1138        ) -> Pin<Box<dyn Future<Output = AutumnResult<Option<WebhookDeliveryLog>>> + Send>>
1139        {
1140            Box::pin(async { Ok(None) })
1141        }
1142    }
1143
1144    #[tokio::test]
1145    async fn replace_delivery_log_is_not_a_delivery_outcome() {
1146        let store = CountingReplacementStore::new();
1147        let mut log = sample_log("log_replace", "sub_replace");
1148        log.response_status = Some(500);
1149        log.last_error = Some("server returned status: 500 Internal Server Error".to_owned());
1150        log.is_dlq = true;
1151
1152        store.replace_delivery_log(log).await.unwrap();
1153
1154        assert_eq!(
1155            store.log_delivery_count(),
1156            0,
1157            "plain delivery-log replacement must not call log_delivery"
1158        );
1159    }
1160
1161    struct ResetFailingStore {
1162        inner: InMemoryOutboundWebhookHandler,
1163    }
1164
1165    impl ResetFailingStore {
1166        fn new() -> Self {
1167            Self {
1168                inner: InMemoryOutboundWebhookHandler::new(),
1169            }
1170        }
1171
1172        async fn create_subscription(&self, sub: WebhookSubscription) {
1173            self.inner.create_subscription(sub).await.unwrap();
1174        }
1175
1176        async fn delivery_log(&self, id: &str) -> WebhookDeliveryLog {
1177            self.inner
1178                .get_delivery_log(id)
1179                .await
1180                .unwrap()
1181                .expect("delivery log should exist")
1182        }
1183    }
1184
1185    impl OutboundWebhookHandler for ResetFailingStore {
1186        fn get_subscriptions(
1187            &self,
1188            topic: &str,
1189        ) -> Pin<Box<dyn Future<Output = AutumnResult<Vec<WebhookSubscription>>> + Send>> {
1190            <InMemoryOutboundWebhookHandler as OutboundWebhookHandler>::get_subscriptions(
1191                &self.inner,
1192                topic,
1193            )
1194        }
1195
1196        fn log_delivery(
1197            &self,
1198            log: WebhookDeliveryLog,
1199        ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
1200            <InMemoryOutboundWebhookHandler as OutboundWebhookHandler>::log_delivery(
1201                &self.inner,
1202                log,
1203            )
1204        }
1205
1206        fn replace_delivery_log(
1207            &self,
1208            log: WebhookDeliveryLog,
1209        ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
1210            <InMemoryOutboundWebhookHandler as OutboundWebhookHandler>::replace_delivery_log(
1211                &self.inner,
1212                log,
1213            )
1214        }
1215
1216        fn get_subscription(
1217            &self,
1218            id: &str,
1219        ) -> Pin<Box<dyn Future<Output = AutumnResult<Option<WebhookSubscription>>> + Send>>
1220        {
1221            <InMemoryOutboundWebhookHandler as OutboundWebhookHandler>::get_subscription(
1222                &self.inner,
1223                id,
1224            )
1225        }
1226
1227        fn get_delivery_log(
1228            &self,
1229            id: &str,
1230        ) -> Pin<Box<dyn Future<Output = AutumnResult<Option<WebhookDeliveryLog>>> + Send>>
1231        {
1232            <InMemoryOutboundWebhookHandler as OutboundWebhookHandler>::get_delivery_log(
1233                &self.inner,
1234                id,
1235            )
1236        }
1237
1238        fn reset_subscription_failures(
1239            &self,
1240            _id: &str,
1241        ) -> Pin<Box<dyn Future<Output = AutumnResult<()>> + Send>> {
1242            Box::pin(async {
1243                Err(AutumnError::internal_server_error_msg(
1244                    "reset backend unavailable",
1245                ))
1246            })
1247        }
1248    }
1249
1250    #[tokio::test]
1251    async fn successful_delivery_does_not_retry_when_failure_reset_fails() {
1252        let state = AppState::for_test();
1253        let store = Arc::new(ResetFailingStore::new());
1254        let registry = Arc::new(MockRegistry::new());
1255        let mock = mock_builder(registry.clone(), "http://mock-receiver/webhooks/success")
1256            .post("/webhooks/success")
1257            .respond_with(200, serde_json::json!({ "received": true }));
1258        state.insert_extension(HttpMockRegistryExt(registry));
1259        install_outbound_webhook_manager(&state, store.clone(), 1);
1260
1261        let sub = sample_subscription(
1262            "sub_success",
1263            "http://mock-receiver/webhooks/success",
1264            WebhookSubscriptionStatus::Active,
1265        );
1266        store.create_subscription(sub.clone()).await;
1267        let log = sample_log("log_success", "sub_success");
1268
1269        deliver_webhook_job(
1270            state,
1271            serde_json::json!({
1272                "subscription": sub,
1273                "log": log,
1274            }),
1275        )
1276        .await
1277        .expect("accepted webhook delivery must not be retried because counter reset failed");
1278
1279        mock.expect_called(1);
1280        let persisted = store.delivery_log("log_success").await;
1281        assert_eq!(persisted.response_status, Some(200));
1282        assert!(persisted.last_error.is_none());
1283    }
1284
1285    #[tokio::test]
1286    async fn webhook_manager_uses_http_client_config_base_urls() {
1287        let _guard = crate::job::global_job_runtime_test_lock().lock().await;
1288        crate::job::clear_global_job_client();
1289
1290        let store = Arc::new(InMemoryOutboundWebhookHandler::new());
1291        let plugin = OutboundWebhookPlugin::new(store.clone()).with_initial_backoff_ms(1);
1292        let mut config = crate::config::AutumnConfig::default();
1293        config.http.client.base_urls.insert(
1294            "hook-service".to_owned(),
1295            "http://mock-receiver/base".to_owned(),
1296        );
1297
1298        let mut app_builder = crate::test::TestApp::new().config(config).plugin(plugin);
1299        let mock = app_builder
1300            .http_mock("hook-service")
1301            .post("/base/hook-service")
1302            .respond_with(200, serde_json::json!({ "received": true }));
1303        let app = app_builder.build();
1304        let state = app.state();
1305
1306        let sub = sample_subscription(
1307            "sub_config",
1308            "hook-service",
1309            WebhookSubscriptionStatus::Active,
1310        );
1311        store.create_subscription(sub.clone()).await.unwrap();
1312        let log = sample_log("log_config", "sub_config");
1313
1314        deliver_webhook_job(
1315            state.clone(),
1316            serde_json::json!({
1317                "subscription": sub,
1318                "log": log,
1319            }),
1320        )
1321        .await
1322        .unwrap();
1323
1324        mock.expect_called(1);
1325        crate::job::clear_global_job_client();
1326    }
1327}