Skip to main content

mail_laser/webhook/
mod.rs

1use crate::config::Config;
2use acton_reactive::prelude::*;
3use anyhow::Result;
4use bytes::Bytes;
5use http_body_util::Full;
6use hyper::Request;
7use hyper_rustls::HttpsConnectorBuilder;
8use hyper_util::{
9    client::legacy::{connect::HttpConnector, Client},
10    rt::TokioExecutor,
11};
12use log::{error, info};
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Duration;
17
18type HttpsConn = hyper_rustls::HttpsConnector<HttpConnector>;
19type WebhookHttpClient = Client<HttpsConn, Full<Bytes>>;
20
21// --- Message types ---
22
23#[acton_message]
24pub struct ForwardEmail {
25    pub payload: EmailPayload,
26}
27
28#[acton_message]
29struct WebhookResult {
30    success: bool,
31    #[allow(dead_code)] // read via ctx.message() in actor handler
32    sender_info: String,
33}
34
35// --- Public data structures ---
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct EmailPayload {
39    pub sender: String,
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub sender_name: Option<String>,
42    pub recipient: String,
43    pub subject: String,
44    pub body: String,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub html_body: Option<String>,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub headers: Option<HashMap<String, String>>,
49}
50
51// --- WebhookClient (unchanged transport layer) ---
52
53pub struct WebhookClient {
54    config: Config,
55    client: WebhookHttpClient,
56    user_agent: String,
57}
58
59impl WebhookClient {
60    pub fn new(config: Config) -> Self {
61        let https = {
62            let connector = HttpsConnectorBuilder::new()
63                .with_native_roots()
64                .expect("Failed to load native root certificates for hyper-rustls");
65            #[cfg(debug_assertions)]
66            let connector = connector.https_or_http();
67            #[cfg(not(debug_assertions))]
68            let connector = connector.https_only();
69            connector.enable_http1().build()
70        };
71
72        let client: WebhookHttpClient = Client::builder(TokioExecutor::new()).build(https);
73
74        let user_agent = format!("{}/{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"));
75
76        Self {
77            config,
78            client,
79            user_agent,
80        }
81    }
82
83    pub async fn forward_email(&self, email: EmailPayload) -> Result<()> {
84        info!(
85            "Forwarding email from sender '{}' (Name: {}) with subject: '{}'",
86            email.sender,
87            email.sender_name.as_deref().unwrap_or("N/A"),
88            email.subject
89        );
90
91        let json_body = serde_json::to_string(&email)?;
92
93        let request = Request::builder()
94            .method(hyper::Method::POST)
95            .uri(&self.config.webhook_url)
96            .header("content-type", "application/json")
97            .header("user-agent", &self.user_agent)
98            .body(Full::new(Bytes::from(json_body)))?;
99
100        let response = self.client.request(request).await?;
101
102        let status = response.status();
103        if !status.is_success() {
104            let msg = format!(
105                "Webhook request to {} failed with status: {}",
106                self.config.webhook_url, status
107            );
108            error!("{}", msg);
109            return Err(anyhow::anyhow!(msg));
110        }
111
112        info!(
113            "Email successfully forwarded to webhook {}, status: {}",
114            self.config.webhook_url, status
115        );
116
117        Ok(())
118    }
119}
120
121// --- WebhookActor ---
122
123fn current_time_ms() -> u64 {
124    std::time::SystemTime::now()
125        .duration_since(std::time::UNIX_EPOCH)
126        .unwrap_or_default()
127        .as_millis() as u64
128}
129
130#[acton_actor]
131pub struct WebhookState {
132    consecutive_failures: u32,
133    circuit_open: bool,
134    circuit_opened_at_ms: u64,
135    total_forwarded: u64,
136    total_failed: u64,
137    webhook_timeout_secs: u64,
138    max_retries: u32,
139    circuit_threshold: u32,
140    circuit_reset_secs: u64,
141}
142
143impl WebhookState {
144    pub async fn create(
145        runtime: &mut ActorRuntime,
146        config: &Config,
147    ) -> anyhow::Result<ActorHandle> {
148        let actor_config = ActorConfig::new(Ern::with_root("webhook-dispatcher")?, None, None)?
149            .with_restart_policy(RestartPolicy::Permanent);
150
151        let mut builder = runtime.new_actor_with_config::<Self>(actor_config);
152
153        builder.model.webhook_timeout_secs = config.webhook_timeout_secs;
154        builder.model.max_retries = config.webhook_max_retries;
155        builder.model.circuit_threshold = config.circuit_breaker_threshold;
156        builder.model.circuit_reset_secs = config.circuit_breaker_reset_secs;
157
158        let client = Arc::new(WebhookClient::new(config.clone()));
159
160        // ForwardEmail handler: circuit breaker check + async delivery with timeout + retry
161        builder.mutate_on::<ForwardEmail>(move |actor, ctx| {
162            let client = client.clone();
163            let payload = ctx.message().payload.clone();
164            let timeout_secs = actor.model.webhook_timeout_secs;
165            let max_retries = actor.model.max_retries;
166            let sender_info = payload.sender.clone();
167
168            // Circuit breaker check (synchronous — can mutate state)
169            if actor.model.circuit_open {
170                let elapsed = current_time_ms() - actor.model.circuit_opened_at_ms;
171                if elapsed > actor.model.circuit_reset_secs * 1000 {
172                    actor.model.circuit_open = false;
173                    actor.model.consecutive_failures = 0;
174                    tracing::info!("Circuit breaker half-open, allowing request");
175                } else {
176                    tracing::warn!("Circuit breaker OPEN, dropping email from {}", sender_info);
177                    actor.model.total_failed += 1;
178                    return Reply::ready();
179                }
180            }
181
182            let self_handle = actor.handle().clone();
183
184            Reply::pending(async move {
185                let mut success = false;
186                for attempt in 0..=max_retries {
187                    if attempt > 0 {
188                        let backoff_ms = 100 * 2u64.pow(attempt - 1);
189                        tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
190                        tracing::info!("Retry attempt {} for email from {}", attempt, sender_info);
191                    }
192
193                    let result = tokio::time::timeout(
194                        Duration::from_secs(timeout_secs),
195                        client.forward_email(payload.clone()),
196                    )
197                    .await;
198
199                    match result {
200                        Ok(Ok(())) => {
201                            success = true;
202                            break;
203                        }
204                        Ok(Err(e)) => {
205                            tracing::warn!("Webhook attempt {} failed: {:#}", attempt + 1, e);
206                        }
207                        Err(_) => {
208                            tracing::warn!(
209                                "Webhook attempt {} timed out ({}s)",
210                                attempt + 1,
211                                timeout_secs
212                            );
213                        }
214                    }
215                }
216
217                if !success {
218                    tracing::error!(
219                        "Webhook delivery failed after {} retries for {}",
220                        max_retries,
221                        sender_info
222                    );
223                }
224
225                self_handle
226                    .send(WebhookResult {
227                        success,
228                        sender_info,
229                    })
230                    .await;
231            })
232        });
233
234        // WebhookResult handler: update circuit breaker state
235        builder.mutate_on::<WebhookResult>(|actor, ctx| {
236            let result = ctx.message();
237            if result.success {
238                actor.model.consecutive_failures = 0;
239                actor.model.total_forwarded += 1;
240            } else {
241                actor.model.consecutive_failures += 1;
242                actor.model.total_failed += 1;
243                if actor.model.consecutive_failures >= actor.model.circuit_threshold {
244                    actor.model.circuit_open = true;
245                    actor.model.circuit_opened_at_ms = current_time_ms();
246                    tracing::error!(
247                        "Circuit breaker OPENED after {} consecutive failures",
248                        actor.model.consecutive_failures
249                    );
250                }
251            }
252            Reply::ready()
253        });
254
255        builder.after_stop(|actor| {
256            tracing::info!(
257                "WebhookActor stopped. Forwarded: {}, Failed: {}",
258                actor.model.total_forwarded,
259                actor.model.total_failed
260            );
261            Reply::ready()
262        });
263
264        Ok(builder.start().await)
265    }
266}
267
268#[cfg(test)]
269mod tests;