mail_laser/webhook/
mod.rs1use crate::config::Config;
2use acton_reactive::prelude::*;
3use anyhow::Result;
4use bytes::Bytes;
5use http_body_util::Full;
6use hyper::Request;
7use hyper_rustls::HttpsConnectorBuilder;
8use hyper_util::{
9 client::legacy::{connect::HttpConnector, Client},
10 rt::TokioExecutor,
11};
12use log::{error, info};
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::time::Duration;
17
18type HttpsConn = hyper_rustls::HttpsConnector<HttpConnector>;
19type WebhookHttpClient = Client<HttpsConn, Full<Bytes>>;
20
21#[acton_message]
24pub struct ForwardEmail {
25 pub payload: EmailPayload,
26}
27
28#[acton_message]
29struct WebhookResult {
30 success: bool,
31 #[allow(dead_code)] sender_info: String,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct EmailPayload {
39 pub sender: String,
40 #[serde(skip_serializing_if = "Option::is_none")]
41 pub sender_name: Option<String>,
42 pub recipient: String,
43 pub subject: String,
44 pub body: String,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub html_body: Option<String>,
47 #[serde(skip_serializing_if = "Option::is_none")]
48 pub headers: Option<HashMap<String, String>>,
49}
50
51pub struct WebhookClient {
54 config: Config,
55 client: WebhookHttpClient,
56 user_agent: String,
57}
58
59impl WebhookClient {
60 pub fn new(config: Config) -> Self {
61 let https = {
62 let connector = HttpsConnectorBuilder::new()
63 .with_native_roots()
64 .expect("Failed to load native root certificates for hyper-rustls");
65 #[cfg(debug_assertions)]
66 let connector = connector.https_or_http();
67 #[cfg(not(debug_assertions))]
68 let connector = connector.https_only();
69 connector.enable_http1().build()
70 };
71
72 let client: WebhookHttpClient = Client::builder(TokioExecutor::new()).build(https);
73
74 let user_agent = format!("{}/{}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"));
75
76 Self {
77 config,
78 client,
79 user_agent,
80 }
81 }
82
83 pub async fn forward_email(&self, email: EmailPayload) -> Result<()> {
84 info!(
85 "Forwarding email from sender '{}' (Name: {}) with subject: '{}'",
86 email.sender,
87 email.sender_name.as_deref().unwrap_or("N/A"),
88 email.subject
89 );
90
91 let json_body = serde_json::to_string(&email)?;
92
93 let request = Request::builder()
94 .method(hyper::Method::POST)
95 .uri(&self.config.webhook_url)
96 .header("content-type", "application/json")
97 .header("user-agent", &self.user_agent)
98 .body(Full::new(Bytes::from(json_body)))?;
99
100 let response = self.client.request(request).await?;
101
102 let status = response.status();
103 if !status.is_success() {
104 let msg = format!(
105 "Webhook request to {} failed with status: {}",
106 self.config.webhook_url, status
107 );
108 error!("{}", msg);
109 return Err(anyhow::anyhow!(msg));
110 }
111
112 info!(
113 "Email successfully forwarded to webhook {}, status: {}",
114 self.config.webhook_url, status
115 );
116
117 Ok(())
118 }
119}
120
121fn current_time_ms() -> u64 {
124 std::time::SystemTime::now()
125 .duration_since(std::time::UNIX_EPOCH)
126 .unwrap_or_default()
127 .as_millis() as u64
128}
129
130#[acton_actor]
131pub struct WebhookState {
132 consecutive_failures: u32,
133 circuit_open: bool,
134 circuit_opened_at_ms: u64,
135 total_forwarded: u64,
136 total_failed: u64,
137 webhook_timeout_secs: u64,
138 max_retries: u32,
139 circuit_threshold: u32,
140 circuit_reset_secs: u64,
141}
142
143impl WebhookState {
144 pub async fn create(
145 runtime: &mut ActorRuntime,
146 config: &Config,
147 ) -> anyhow::Result<ActorHandle> {
148 let actor_config = ActorConfig::new(Ern::with_root("webhook-dispatcher")?, None, None)?
149 .with_restart_policy(RestartPolicy::Permanent);
150
151 let mut builder = runtime.new_actor_with_config::<Self>(actor_config);
152
153 builder.model.webhook_timeout_secs = config.webhook_timeout_secs;
154 builder.model.max_retries = config.webhook_max_retries;
155 builder.model.circuit_threshold = config.circuit_breaker_threshold;
156 builder.model.circuit_reset_secs = config.circuit_breaker_reset_secs;
157
158 let client = Arc::new(WebhookClient::new(config.clone()));
159
160 builder.mutate_on::<ForwardEmail>(move |actor, ctx| {
162 let client = client.clone();
163 let payload = ctx.message().payload.clone();
164 let timeout_secs = actor.model.webhook_timeout_secs;
165 let max_retries = actor.model.max_retries;
166 let sender_info = payload.sender.clone();
167
168 if actor.model.circuit_open {
170 let elapsed = current_time_ms() - actor.model.circuit_opened_at_ms;
171 if elapsed > actor.model.circuit_reset_secs * 1000 {
172 actor.model.circuit_open = false;
173 actor.model.consecutive_failures = 0;
174 tracing::info!("Circuit breaker half-open, allowing request");
175 } else {
176 tracing::warn!("Circuit breaker OPEN, dropping email from {}", sender_info);
177 actor.model.total_failed += 1;
178 return Reply::ready();
179 }
180 }
181
182 let self_handle = actor.handle().clone();
183
184 Reply::pending(async move {
185 let mut success = false;
186 for attempt in 0..=max_retries {
187 if attempt > 0 {
188 let backoff_ms = 100 * 2u64.pow(attempt - 1);
189 tokio::time::sleep(Duration::from_millis(backoff_ms)).await;
190 tracing::info!("Retry attempt {} for email from {}", attempt, sender_info);
191 }
192
193 let result = tokio::time::timeout(
194 Duration::from_secs(timeout_secs),
195 client.forward_email(payload.clone()),
196 )
197 .await;
198
199 match result {
200 Ok(Ok(())) => {
201 success = true;
202 break;
203 }
204 Ok(Err(e)) => {
205 tracing::warn!("Webhook attempt {} failed: {:#}", attempt + 1, e);
206 }
207 Err(_) => {
208 tracing::warn!(
209 "Webhook attempt {} timed out ({}s)",
210 attempt + 1,
211 timeout_secs
212 );
213 }
214 }
215 }
216
217 if !success {
218 tracing::error!(
219 "Webhook delivery failed after {} retries for {}",
220 max_retries,
221 sender_info
222 );
223 }
224
225 self_handle
226 .send(WebhookResult {
227 success,
228 sender_info,
229 })
230 .await;
231 })
232 });
233
234 builder.mutate_on::<WebhookResult>(|actor, ctx| {
236 let result = ctx.message();
237 if result.success {
238 actor.model.consecutive_failures = 0;
239 actor.model.total_forwarded += 1;
240 } else {
241 actor.model.consecutive_failures += 1;
242 actor.model.total_failed += 1;
243 if actor.model.consecutive_failures >= actor.model.circuit_threshold {
244 actor.model.circuit_open = true;
245 actor.model.circuit_opened_at_ms = current_time_ms();
246 tracing::error!(
247 "Circuit breaker OPENED after {} consecutive failures",
248 actor.model.consecutive_failures
249 );
250 }
251 }
252 Reply::ready()
253 });
254
255 builder.after_stop(|actor| {
256 tracing::info!(
257 "WebhookActor stopped. Forwarded: {}, Failed: {}",
258 actor.model.total_forwarded,
259 actor.model.total_failed
260 );
261 Reply::ready()
262 });
263
264 Ok(builder.start().await)
265 }
266}
267
268#[cfg(test)]
269mod tests;