Skip to main content

ferro_notifications/
dispatcher.rs

1//! Notification dispatcher for sending notifications through channels.
2
3use crate::channel::Channel;
4use crate::channels::{DatabaseMessage, InAppMessage, MailMessage, SlackMessage, WhatsAppMessage};
5use crate::notifiable::{DatabaseNotificationStore, Notifiable};
6use crate::notification::Notification;
7use crate::Error;
8use serde::Serialize;
9use std::env;
10use std::sync::{Arc, OnceLock};
11use tracing::{error, info, warn};
12
13/// Global notification dispatcher configuration.
14static CONFIG: OnceLock<NotificationConfig> = OnceLock::new();
15
16/// Configuration for the notification dispatcher.
17#[derive(Clone, Default)]
18pub struct NotificationConfig {
19    /// Mail configuration (supports SMTP and Resend drivers).
20    pub mail: Option<MailConfig>,
21    /// Slack webhook URL.
22    pub slack_webhook: Option<String>,
23    /// Enable the WhatsApp channel (per CONTEXT.md D-04).
24    ///
25    /// Defaults to `false`. When `true`, the dispatcher calls
26    /// [`ferro_whatsapp::WhatsApp::send`] which requires that
27    /// [`ferro_whatsapp::WhatsApp::init`] was called at app startup.
28    pub whatsapp_enabled: bool,
29    /// In-app SSE channel configuration (per CONTEXT.md D-07).
30    ///
31    /// When `Some`, `Channel::InApp` dispatches persist via `store` then publish via `broker`.
32    /// When `None`, `Channel::InApp` emits a structured "channel not configured" no-op.
33    pub in_app: Option<InAppConfig>,
34    /// Database notification store (per CONTEXT.md D-13, closes ARCH-FINDING-02).
35    ///
36    /// When `Some`, `Channel::Database` calls `store.store(...)` instead of the placeholder log.
37    /// When `None`, the existing placeholder behavior is preserved (backward-compat).
38    /// Note: `InAppConfig.store` is independent of this field — they may point to the same Arc
39    /// or to different stores.
40    pub database_store: Option<Arc<dyn DatabaseNotificationStore>>,
41}
42
43/// In-app SSE channel configuration.
44///
45/// Combines a `Broadcaster` for real-time fanout and a `DatabaseNotificationStore`
46/// for persistence. Per CONTEXT.md D-08, `Channel::InApp` dispatches write the DB-store
47/// leg first, then publish to the broadcast channel `format!("user.{}", notifiable_id)`.
48#[derive(Clone)]
49pub struct InAppConfig {
50    /// Broadcaster handle for SSE / WebSocket fanout.
51    pub broker: Arc<ferro_broadcast::Broadcaster>,
52    /// Persistence store — typically the same Arc as `NotificationConfig::database_store`.
53    pub store: Arc<dyn DatabaseNotificationStore>,
54}
55
56/// Mail transport driver.
57#[derive(Debug, Clone, Default)]
58pub enum MailDriver {
59    /// SMTP via lettre (default).
60    #[default]
61    Smtp,
62    /// Resend HTTP API.
63    Resend,
64}
65
66/// SMTP-specific configuration.
67#[derive(Clone)]
68pub struct SmtpConfig {
69    /// SMTP host.
70    pub host: String,
71    /// SMTP port.
72    pub port: u16,
73    /// SMTP username.
74    pub username: Option<String>,
75    /// SMTP password.
76    pub password: Option<String>,
77    /// Use TLS.
78    pub tls: bool,
79}
80
81/// Resend-specific configuration.
82#[derive(Clone)]
83pub struct ResendConfig {
84    /// Resend API key.
85    pub api_key: String,
86}
87
88/// Mail configuration supporting multiple drivers.
89#[derive(Clone)]
90pub struct MailConfig {
91    /// Which driver to use.
92    pub driver: MailDriver,
93    /// Default from address (shared across all drivers).
94    pub from: String,
95    /// Default from name (shared across all drivers).
96    pub from_name: Option<String>,
97    /// SMTP-specific config (only when driver = Smtp).
98    pub smtp: Option<SmtpConfig>,
99    /// Resend-specific config (only when driver = Resend).
100    pub resend: Option<ResendConfig>,
101}
102
103impl NotificationConfig {
104    /// Create a new notification config.
105    pub fn new() -> Self {
106        Self::default()
107    }
108
109    /// Create configuration from environment variables.
110    ///
111    /// Reads the following environment variables:
112    /// - Mail: `MAIL_HOST`, `MAIL_PORT`, `MAIL_USERNAME`, `MAIL_PASSWORD`,
113    ///   `MAIL_FROM_ADDRESS`, `MAIL_FROM_NAME`, `MAIL_ENCRYPTION`
114    /// - Slack: `SLACK_WEBHOOK_URL`
115    ///
116    /// # Example
117    ///
118    /// ```rust,ignore
119    /// use ferro_notifications::NotificationConfig;
120    ///
121    /// // In bootstrap.rs
122    /// let config = NotificationConfig::from_env();
123    /// NotificationDispatcher::configure(config);
124    /// ```
125    pub fn from_env() -> Self {
126        Self {
127            mail: MailConfig::from_env(),
128            slack_webhook: env::var("SLACK_WEBHOOK_URL").ok().filter(|s| !s.is_empty()),
129            whatsapp_enabled: env::var("WHATSAPP_ENABLED")
130                .ok()
131                .and_then(|v| v.parse::<bool>().ok())
132                .unwrap_or(false),
133            // `in_app` and `database_store` require typed handles (per CONTEXT.md D-14)
134            // — they are not env-driven.
135            in_app: None,
136            database_store: None,
137        }
138    }
139
140    /// Set the mail configuration.
141    pub fn mail(mut self, config: MailConfig) -> Self {
142        self.mail = Some(config);
143        self
144    }
145
146    /// Set the Slack webhook URL.
147    pub fn slack_webhook(mut self, url: impl Into<String>) -> Self {
148        self.slack_webhook = Some(url.into());
149        self
150    }
151
152    /// Enable or disable the WhatsApp channel.
153    pub fn with_whatsapp_enabled(mut self, enabled: bool) -> Self {
154        self.whatsapp_enabled = enabled;
155        self
156    }
157
158    /// Set the in-app channel configuration (per CONTEXT.md D-07).
159    pub fn with_in_app(mut self, config: InAppConfig) -> Self {
160        self.in_app = Some(config);
161        self
162    }
163
164    /// Set the database notification store (per CONTEXT.md D-13).
165    ///
166    /// When configured, `Channel::Database` dispatches call `store.store(...)`
167    /// instead of the placeholder log path.
168    pub fn with_database_store(mut self, store: Arc<dyn DatabaseNotificationStore>) -> Self {
169        self.database_store = Some(store);
170        self
171    }
172}
173
174impl MailConfig {
175    /// Create a new SMTP mail config (backwards compatible).
176    pub fn new(host: impl Into<String>, port: u16, from: impl Into<String>) -> Self {
177        Self {
178            driver: MailDriver::Smtp,
179            from: from.into(),
180            from_name: None,
181            smtp: Some(SmtpConfig {
182                host: host.into(),
183                port,
184                username: None,
185                password: None,
186                tls: true,
187            }),
188            resend: None,
189        }
190    }
191
192    /// Create a new Resend mail config.
193    pub fn resend(api_key: impl Into<String>, from: impl Into<String>) -> Self {
194        Self {
195            driver: MailDriver::Resend,
196            from: from.into(),
197            from_name: None,
198            smtp: None,
199            resend: Some(ResendConfig {
200                api_key: api_key.into(),
201            }),
202        }
203    }
204
205    /// Create mail configuration from environment variables.
206    ///
207    /// Returns `None` if required variables are missing.
208    ///
209    /// Reads the following environment variables:
210    /// - `MAIL_DRIVER`: "smtp" (default) or "resend"
211    /// - `MAIL_FROM_ADDRESS`: Default from email address (required for all drivers)
212    /// - `MAIL_FROM_NAME`: Default from name (optional)
213    ///
214    /// SMTP driver variables:
215    /// - `MAIL_HOST`: SMTP server host (required)
216    /// - `MAIL_PORT`: SMTP server port (default: 587)
217    /// - `MAIL_USERNAME`: SMTP username (optional)
218    /// - `MAIL_PASSWORD`: SMTP password (optional)
219    /// - `MAIL_ENCRYPTION`: "tls" or "none" (default: "tls")
220    ///
221    /// Resend driver variables:
222    /// - `RESEND_API_KEY`: Resend API key (required)
223    ///
224    /// # Example
225    ///
226    /// ```rust,ignore
227    /// use ferro_notifications::MailConfig;
228    ///
229    /// if let Some(config) = MailConfig::from_env() {
230    ///     // Mail is configured
231    /// }
232    /// ```
233    pub fn from_env() -> Option<Self> {
234        let from = env::var("MAIL_FROM_ADDRESS")
235            .ok()
236            .filter(|s| !s.is_empty())?;
237        let from_name = env::var("MAIL_FROM_NAME").ok().filter(|s| !s.is_empty());
238
239        let driver_str = env::var("MAIL_DRIVER")
240            .ok()
241            .filter(|s| !s.is_empty())
242            .unwrap_or_else(|| "smtp".into());
243
244        match driver_str.to_lowercase().as_str() {
245            "resend" => {
246                let api_key = env::var("RESEND_API_KEY").ok().filter(|s| !s.is_empty())?;
247
248                Some(Self {
249                    driver: MailDriver::Resend,
250                    from,
251                    from_name,
252                    smtp: None,
253                    resend: Some(ResendConfig { api_key }),
254                })
255            }
256            _ => {
257                // Default: SMTP (backwards compatible)
258                let host = env::var("MAIL_HOST").ok().filter(|s| !s.is_empty())?;
259
260                let port = env::var("MAIL_PORT")
261                    .ok()
262                    .and_then(|p| p.parse().ok())
263                    .unwrap_or(587);
264
265                let username = env::var("MAIL_USERNAME").ok().filter(|s| !s.is_empty());
266                let password = env::var("MAIL_PASSWORD").ok().filter(|s| !s.is_empty());
267
268                let tls = env::var("MAIL_ENCRYPTION")
269                    .map(|v| v.to_lowercase() != "none")
270                    .unwrap_or(true);
271
272                Some(Self {
273                    driver: MailDriver::Smtp,
274                    from,
275                    from_name,
276                    smtp: Some(SmtpConfig {
277                        host,
278                        port,
279                        username,
280                        password,
281                        tls,
282                    }),
283                    resend: None,
284                })
285            }
286        }
287    }
288
289    /// Set SMTP credentials.
290    ///
291    /// No-ops with a warning when the driver is not [`MailDriver::Smtp`] — calling
292    /// `credentials(...)` on a Resend config is almost certainly a caller mistake
293    /// and should not silently mutate the config into an SMTP shape.
294    pub fn credentials(mut self, username: impl Into<String>, password: impl Into<String>) -> Self {
295        if !matches!(self.driver, MailDriver::Smtp) {
296            warn!("MailConfig::credentials called on non-SMTP driver; ignoring");
297            return self;
298        }
299        let smtp = self.smtp.get_or_insert_with(|| SmtpConfig {
300            host: String::new(),
301            port: 587,
302            username: None,
303            password: None,
304            tls: true,
305        });
306        smtp.username = Some(username.into());
307        smtp.password = Some(password.into());
308        self
309    }
310
311    /// Set the from name.
312    pub fn from_name(mut self, name: impl Into<String>) -> Self {
313        self.from_name = Some(name.into());
314        self
315    }
316
317    /// Disable TLS (SMTP only).
318    pub fn no_tls(mut self) -> Self {
319        if let Some(ref mut smtp) = self.smtp {
320            smtp.tls = false;
321        }
322        self
323    }
324}
325
326/// Resend API attachment payload.
327#[derive(Serialize)]
328struct ResendAttachment {
329    filename: String,
330    /// Base64-encoded attachment content (standard alphabet, not URL-safe).
331    content: String,
332}
333
334/// Resend API email payload.
335#[derive(Serialize)]
336struct ResendEmailPayload {
337    from: String,
338    to: Vec<String>,
339    subject: String,
340    #[serde(skip_serializing_if = "Option::is_none")]
341    html: Option<String>,
342    #[serde(skip_serializing_if = "Option::is_none")]
343    text: Option<String>,
344    #[serde(skip_serializing_if = "Vec::is_empty")]
345    cc: Vec<String>,
346    #[serde(skip_serializing_if = "Vec::is_empty")]
347    bcc: Vec<String>,
348    #[serde(skip_serializing_if = "Option::is_none")]
349    reply_to: Option<String>,
350    #[serde(skip_serializing_if = "Vec::is_empty")]
351    attachments: Vec<ResendAttachment>,
352}
353
354/// The notification dispatcher.
355pub struct NotificationDispatcher;
356
357impl NotificationDispatcher {
358    /// Configure the global notification dispatcher.
359    pub fn configure(config: NotificationConfig) {
360        let _ = CONFIG.set(config);
361    }
362
363    /// Get the current configuration.
364    pub fn config() -> Option<&'static NotificationConfig> {
365        CONFIG.get()
366    }
367
368    /// Send a notification to a notifiable entity.
369    pub async fn send<N, T>(notifiable: &N, notification: T) -> Result<(), Error>
370    where
371        N: Notifiable + ?Sized,
372        T: Notification,
373    {
374        let channels = notification.via();
375        let notification_type = notification.notification_type();
376
377        info!(
378            notification = notification_type,
379            channels = ?channels,
380            "Dispatching notification"
381        );
382
383        for channel in channels {
384            match channel {
385                Channel::Mail => {
386                    if let Some(mail) = notification.to_mail() {
387                        Self::send_mail(notifiable, &mail).await?;
388                    }
389                }
390                Channel::Database => {
391                    if let Some(db_msg) = notification.to_database() {
392                        Self::send_database(notifiable, &db_msg).await?;
393                    }
394                }
395                Channel::Slack => {
396                    if let Some(slack) = notification.to_slack() {
397                        Self::send_slack(notifiable, &slack).await?;
398                    }
399                }
400                Channel::WhatsApp => {
401                    if let Some(wa) = notification.to_whatsapp() {
402                        Self::send_whatsapp(notifiable, &wa).await?;
403                    }
404                }
405                Channel::InApp => {
406                    if let Some(in_app) = notification.to_in_app() {
407                        Self::send_in_app(notifiable, &in_app).await?;
408                    }
409                }
410                Channel::Sms | Channel::Push => {
411                    // Per ARCH-FINDING-03 — not implemented in this phase.
412                    info!(channel = %channel, "Channel not implemented");
413                }
414            }
415        }
416
417        Ok(())
418    }
419
420    /// Send a mail notification, dispatching to the configured driver.
421    async fn send_mail<N: Notifiable + ?Sized>(
422        notifiable: &N,
423        message: &MailMessage,
424    ) -> Result<(), Error> {
425        let to = notifiable
426            .route_notification_for(Channel::Mail)
427            .ok_or_else(|| Error::ChannelNotAvailable("No mail route configured".into()))?;
428
429        let config = CONFIG
430            .get()
431            .and_then(|c| c.mail.as_ref())
432            .ok_or_else(|| Error::ChannelNotAvailable("Mail not configured".into()))?;
433
434        info!(to = %to, subject = %message.subject, "Sending mail notification");
435
436        match config.driver {
437            MailDriver::Smtp => Self::send_mail_smtp(&to, message, config).await,
438            MailDriver::Resend => Self::send_mail_resend(&to, message, config).await,
439        }
440    }
441
442    /// Send mail via SMTP using lettre.
443    async fn send_mail_smtp(
444        to: &str,
445        message: &MailMessage,
446        config: &MailConfig,
447    ) -> Result<(), Error> {
448        let smtp = config
449            .smtp
450            .as_ref()
451            .ok_or_else(|| Error::mail("SMTP config missing for SMTP driver"))?;
452
453        use lettre::message::{header::ContentType, Attachment, Mailbox, MultiPart, SinglePart};
454        use lettre::transport::smtp::authentication::Credentials;
455        use lettre::{AsyncSmtpTransport, AsyncTransport, Message, Tokio1Executor};
456
457        let effective_from_name = message.from_name.as_ref().or(config.from_name.as_ref());
458        let from: Mailbox = if let Some(name) = effective_from_name {
459            format!("{} <{}>", name, config.from)
460                .parse()
461                .map_err(|e| Error::mail(format!("Invalid from address: {e}")))?
462        } else {
463            config
464                .from
465                .parse()
466                .map_err(|e| Error::mail(format!("Invalid from address: {e}")))?
467        };
468
469        let to_mailbox: Mailbox = to
470            .parse()
471            .map_err(|e| Error::mail(format!("Invalid to address: {e}")))?;
472
473        let mut email_builder = Message::builder()
474            .from(from)
475            .to(to_mailbox)
476            .subject(&message.subject);
477
478        if let Some(ref reply_to) = message.reply_to {
479            let reply_to_mailbox: Mailbox = reply_to
480                .parse()
481                .map_err(|e| Error::mail(format!("Invalid reply-to address: {e}")))?;
482            email_builder = email_builder.reply_to(reply_to_mailbox);
483        }
484
485        for cc in &message.cc {
486            let cc_mailbox: Mailbox = cc
487                .parse()
488                .map_err(|e| Error::mail(format!("Invalid CC address: {e}")))?;
489            email_builder = email_builder.cc(cc_mailbox);
490        }
491
492        for bcc in &message.bcc {
493            let bcc_mailbox: Mailbox = bcc
494                .parse()
495                .map_err(|e| Error::mail(format!("Invalid BCC address: {e}")))?;
496            email_builder = email_builder.bcc(bcc_mailbox);
497        }
498
499        // Body part — single-part for backward-compat when no attachments,
500        // SinglePart wrapped in MultiPart::mixed otherwise.
501        let email = if message.attachments.is_empty() {
502            // Backward-compatible single-part path
503            if let Some(ref html) = message.html {
504                email_builder
505                    .header(ContentType::TEXT_HTML)
506                    .body(html.clone())
507                    .map_err(|e| Error::mail(format!("Failed to build email: {e}")))?
508            } else {
509                email_builder
510                    .header(ContentType::TEXT_PLAIN)
511                    .body(message.body.clone())
512                    .map_err(|e| Error::mail(format!("Failed to build email: {e}")))?
513            }
514        } else {
515            // Multipart path — body becomes a SinglePart inside MultiPart::mixed
516            let body_part = if let Some(ref html) = message.html {
517                SinglePart::html(html.clone())
518            } else {
519                SinglePart::plain(message.body.clone())
520            };
521
522            let mut mp = MultiPart::mixed().singlepart(body_part);
523            for att in &message.attachments {
524                let ct = ContentType::parse(&att.content_type).map_err(|e| {
525                    Error::mail(format!("Invalid content-type '{}': {e}", att.content_type))
526                })?;
527                let part = Attachment::new(att.filename.clone()).body(att.content.clone(), ct);
528                mp = mp.singlepart(part);
529            }
530
531            email_builder
532                .multipart(mp)
533                .map_err(|e| Error::mail(format!("Failed to build multipart email: {e}")))?
534        };
535
536        let transport = if smtp.tls {
537            AsyncSmtpTransport::<Tokio1Executor>::relay(&smtp.host)
538                .map_err(|e| Error::mail(format!("Failed to create transport: {e}")))?
539        } else {
540            AsyncSmtpTransport::<Tokio1Executor>::builder_dangerous(&smtp.host)
541        };
542
543        let transport = transport.port(smtp.port);
544
545        let transport = if let (Some(ref user), Some(ref pass)) = (&smtp.username, &smtp.password) {
546            transport.credentials(Credentials::new(user.clone(), pass.clone()))
547        } else {
548            transport
549        };
550
551        let mailer = transport.build();
552
553        mailer
554            .send(email)
555            .await
556            .map_err(|e| Error::mail(format!("Failed to send email: {e}")))?;
557
558        info!(to = %to, "Mail notification sent via SMTP");
559        Ok(())
560    }
561
562    /// Send mail via Resend HTTP API.
563    async fn send_mail_resend(
564        to: &str,
565        message: &MailMessage,
566        config: &MailConfig,
567    ) -> Result<(), Error> {
568        let resend = config
569            .resend
570            .as_ref()
571            .ok_or_else(|| Error::mail("Resend config missing for Resend driver"))?;
572
573        let from = message.from.clone().unwrap_or_else(|| {
574            let effective_name = message.from_name.as_ref().or(config.from_name.as_ref());
575            if let Some(name) = effective_name {
576                format!("{} <{}>", name, config.from)
577            } else {
578                config.from.clone()
579            }
580        });
581
582        use base64::Engine;
583
584        let attachments: Vec<ResendAttachment> = message
585            .attachments
586            .iter()
587            .map(|att| ResendAttachment {
588                filename: att.filename.clone(),
589                content: base64::engine::general_purpose::STANDARD.encode(&att.content),
590            })
591            .collect();
592
593        let payload = ResendEmailPayload {
594            from,
595            to: vec![to.to_string()],
596            subject: message.subject.clone(),
597            html: message.html.clone(),
598            text: if message.html.is_some() {
599                None
600            } else {
601                Some(message.body.clone())
602            },
603            cc: message.cc.clone(),
604            bcc: message.bcc.clone(),
605            reply_to: message.reply_to.clone(),
606            attachments,
607        };
608
609        let client = reqwest::Client::new();
610        let response = client
611            .post("https://api.resend.com/emails")
612            .bearer_auth(&resend.api_key)
613            .json(&payload)
614            .send()
615            .await
616            .map_err(|e| Error::mail(format!("Resend HTTP request failed: {e}")))?;
617
618        if !response.status().is_success() {
619            let status = response.status();
620            let body = response.text().await.unwrap_or_default();
621            error!(status = %status, body = %body, "Resend API error");
622            return Err(Error::mail(format!("Resend API error {status}: {body}")));
623        }
624
625        // Parse the success body to extract the Resend message id (correlation token
626        // for downstream debugging). Failure to parse is non-fatal: the send already
627        // succeeded per the 2xx status, so we log and continue without an id.
628        let resend_id = match response.json::<serde_json::Value>().await {
629            Ok(body) => body
630                .get("id")
631                .and_then(|v| v.as_str())
632                .map(str::to_owned)
633                .unwrap_or_else(|| "<no-id>".to_string()),
634            Err(e) => {
635                warn!(error = %e, "Resend response parse failed; continuing without id");
636                "<unparseable>".to_string()
637            }
638        };
639
640        info!(to = %to, resend_id = %resend_id, "Mail notification sent via Resend");
641        Ok(())
642    }
643
644    /// Send a database notification.
645    ///
646    /// Per CONTEXT.md D-13 / ARCH-FINDING-02, this calls
647    /// [`DatabaseNotificationStore::store`] when `NotificationConfig::database_store`
648    /// is configured. When unconfigured, retains the original placeholder log
649    /// (backward-compat: no consumer of the placeholder is broken).
650    async fn send_database<N: Notifiable + ?Sized>(
651        notifiable: &N,
652        message: &DatabaseMessage,
653    ) -> Result<(), Error> {
654        let notifiable_id = notifiable.notifiable_id();
655        let notifiable_type = notifiable.notifiable_type();
656
657        if let Some(store) = CONFIG.get().and_then(|c| c.database_store.as_ref()) {
658            store
659                .store(
660                    &notifiable_id,
661                    notifiable_type,
662                    &message.notification_type,
663                    message,
664                )
665                .await?;
666            info!(
667                notifiable_id = %notifiable_id,
668                notification_type = %message.notification_type,
669                "Database notification stored"
670            );
671        } else {
672            warn!(
673                notifiable_id = %notifiable_id,
674                notifiable_type = %notifiable_type,
675                notification_type = %message.notification_type,
676                data = ?message.data,
677                "Database notification dropped — no store configured. \
678                 Call NotificationConfig::with_database_store() at startup."
679            );
680        }
681
682        Ok(())
683    }
684
685    /// Send a Slack notification.
686    async fn send_slack<N: Notifiable + ?Sized>(
687        notifiable: &N,
688        message: &SlackMessage,
689    ) -> Result<(), Error> {
690        let webhook_url = notifiable
691            .route_notification_for(Channel::Slack)
692            .or_else(|| CONFIG.get().and_then(|c| c.slack_webhook.clone()))
693            .ok_or_else(|| Error::ChannelNotAvailable("No Slack webhook configured".into()))?;
694
695        info!(channel = ?message.channel, "Sending Slack notification");
696
697        let client = reqwest::Client::new();
698        let response = client
699            .post(&webhook_url)
700            .json(message)
701            .send()
702            .await
703            .map_err(|e| Error::slack(format!("HTTP request failed: {e}")))?;
704
705        if !response.status().is_success() {
706            let status = response.status();
707            let body = response.text().await.unwrap_or_default();
708            error!(status = %status, body = %body, "Slack webhook failed");
709            return Err(Error::slack(format!("Slack returned {status}: {body}")));
710        }
711
712        info!("Slack notification sent");
713        Ok(())
714    }
715
716    /// Send a WhatsApp notification via the static `ferro_whatsapp::WhatsApp` facade.
717    ///
718    /// Per CONTEXT.md D-04 / ARCH-FINDING-01, the adapter does NOT inject a client.
719    /// `ferro_whatsapp::WhatsApp` owns its global state via `WhatsApp::init` (called
720    /// once at app startup). The `whatsapp_enabled: false` default ensures this code
721    /// path is unreachable unless the consumer opted in.
722    ///
723    /// **Retry is the caller's responsibility.** This dispatcher performs no backoff,
724    /// jitter, or retry on transient failures. Match on the inner [`ferro_whatsapp::Error`]
725    /// via `Error::WhatsApp(inner)` to distinguish retryable variants (e.g. `RateLimit`,
726    /// `NetworkError`) from terminal ones (e.g. invalid phone number); recommended path
727    /// for retry is to enqueue the send through `ferro-queue`.
728    async fn send_whatsapp<N: Notifiable + ?Sized>(
729        notifiable: &N,
730        message: &WhatsAppMessage,
731    ) -> Result<(), Error> {
732        let enabled = CONFIG.get().map(|c| c.whatsapp_enabled).unwrap_or(false);
733
734        if !enabled {
735            info!("WhatsApp channel not configured (WHATSAPP_ENABLED=false)");
736            return Ok(());
737        }
738
739        let phone = notifiable
740            .route_notification_for(Channel::WhatsApp)
741            .ok_or_else(|| Error::ChannelNotAvailable("No WhatsApp route configured".into()))?;
742
743        info!(to = %phone, "Sending WhatsApp notification");
744
745        // The Error::WhatsApp(#[from]) conversion (Plan 02) handles the propagation.
746        let result = ferro_whatsapp::WhatsApp::send(&phone, message.message.clone()).await?;
747        info!(to = %phone, wamid = %result.wamid, "WhatsApp notification sent");
748        Ok(())
749    }
750
751    /// Send an in-app notification (per CONTEXT.md D-06, D-07, D-08).
752    ///
753    /// Writes both legs:
754    /// 1. Persists via [`DatabaseNotificationStore::store`] (DB leg first — broker can replay
755    ///    on reconnect from the store; the inverse order would risk silent loss).
756    /// 2. Publishes via `Broadcaster::broadcast` to channel `user.{id}` with event
757    ///    `Notification.{notification_type}` and the InAppMessage `data` as payload.
758    ///
759    /// Returns the first error encountered. The legs are NOT transactional: if the DB
760    /// write succeeds and the broadcast then fails, the persisted row remains and a
761    /// naive retry will create a duplicate. Per CONTEXT.md D-08 this is intentional —
762    /// the broker can replay from the store on reconnect — but callers performing
763    /// manual retries should dedupe on (notifiable_id, notification_type, idempotency-key).
764    /// `ferro_broadcast::Error` is mapped via [`Error::broadcast`] (no `#[from]` available).
765    async fn send_in_app<N: Notifiable + ?Sized>(
766        notifiable: &N,
767        message: &InAppMessage,
768    ) -> Result<(), Error> {
769        let cfg = match CONFIG.get().and_then(|c| c.in_app.as_ref()) {
770            Some(c) => c,
771            None => {
772                info!("InApp channel not configured");
773                return Ok(());
774            }
775        };
776
777        let notifiable_id = notifiable.notifiable_id();
778        let notifiable_type = notifiable.notifiable_type();
779
780        // Convert InAppMessage to DatabaseMessage for persistence.
781        // Per RESEARCH.md: object data → HashMap of fields; non-object → wrap as { "payload": value }.
782        let db_msg = inapp_to_database_message(message);
783
784        // Leg 1: persistence (DB-store first per D-08).
785        cfg.store
786            .store(
787                &notifiable_id,
788                notifiable_type,
789                &message.notification_type,
790                &db_msg,
791            )
792            .await?;
793
794        // Leg 2: real-time fanout.
795        let channel = format!("user.{notifiable_id}");
796        let event = format!("Notification.{}", message.notification_type);
797        cfg.broker
798            .broadcast(&channel, &event, &message.data)
799            .await
800            .map_err(|e| Error::broadcast(e.to_string()))?;
801
802        info!(
803            notifiable_id = %notifiable_id,
804            notification_type = %message.notification_type,
805            "InApp notification persisted and broadcast"
806        );
807        Ok(())
808    }
809}
810
811/// Convert an [`InAppMessage`] to a [`DatabaseMessage`] for persistence.
812///
813/// If `msg.data` is a JSON object, its fields become the DatabaseMessage data map.
814/// Otherwise, the value is wrapped under the key `"payload"`.
815fn inapp_to_database_message(msg: &InAppMessage) -> DatabaseMessage {
816    use std::collections::HashMap;
817    let data: HashMap<String, serde_json::Value> = if let serde_json::Value::Object(map) = &msg.data
818    {
819        map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
820    } else {
821        let mut m = HashMap::new();
822        m.insert("payload".to_string(), msg.data.clone());
823        m
824    };
825    DatabaseMessage::new(&msg.notification_type).with_data(data)
826}
827
828#[cfg(test)]
829mod tests {
830    use super::*;
831    use serial_test::serial;
832
833    #[test]
834    fn test_mail_config_smtp_builder() {
835        let config = MailConfig::new("smtp.example.com", 587, "noreply@example.com")
836            .credentials("user", "pass")
837            .from_name("My App");
838
839        assert!(matches!(config.driver, MailDriver::Smtp));
840        assert_eq!(config.from, "noreply@example.com");
841        assert_eq!(config.from_name, Some("My App".to_string()));
842
843        let smtp = config.smtp.as_ref().unwrap();
844        assert_eq!(smtp.host, "smtp.example.com");
845        assert_eq!(smtp.port, 587);
846        assert_eq!(smtp.username, Some("user".to_string()));
847        assert_eq!(smtp.password, Some("pass".to_string()));
848        assert!(smtp.tls);
849        assert!(config.resend.is_none());
850    }
851
852    #[test]
853    fn test_mail_config_resend_builder() {
854        let config = MailConfig::resend("re_123456", "noreply@example.com").from_name("My App");
855
856        assert!(matches!(config.driver, MailDriver::Resend));
857        assert_eq!(config.from, "noreply@example.com");
858        assert_eq!(config.from_name, Some("My App".to_string()));
859
860        let resend = config.resend.as_ref().unwrap();
861        assert_eq!(resend.api_key, "re_123456");
862        assert!(config.smtp.is_none());
863    }
864
865    #[test]
866    fn test_mail_config_no_tls() {
867        let config = MailConfig::new("smtp.example.com", 587, "noreply@example.com").no_tls();
868
869        let smtp = config.smtp.as_ref().unwrap();
870        assert!(!smtp.tls);
871    }
872
873    #[test]
874    fn test_notification_config_default() {
875        let config = NotificationConfig::default();
876        assert!(config.mail.is_none());
877        assert!(config.slack_webhook.is_none());
878        assert!(!config.whatsapp_enabled);
879        assert!(config.in_app.is_none());
880        assert!(config.database_store.is_none());
881    }
882
883    #[test]
884    fn test_notification_config_with_database_store_builder() {
885        use crate::channels::DatabaseMessage;
886        use crate::notifiable::DatabaseNotificationStore;
887        use async_trait::async_trait;
888
889        struct NoopStore;
890        #[async_trait]
891        impl DatabaseNotificationStore for NoopStore {
892            async fn store(
893                &self,
894                _: &str,
895                _: &str,
896                _: &str,
897                _: &DatabaseMessage,
898            ) -> Result<(), Error> {
899                Ok(())
900            }
901            async fn mark_as_read(&self, _: &str) -> Result<(), Error> {
902                Ok(())
903            }
904            async fn unread(&self, _: &str) -> Result<Vec<crate::StoredNotification>, Error> {
905                Ok(vec![])
906            }
907        }
908
909        let store: Arc<dyn DatabaseNotificationStore> = Arc::new(NoopStore);
910        let config = NotificationConfig::new().with_database_store(store);
911        assert!(config.database_store.is_some());
912    }
913
914    #[test]
915    #[serial]
916    fn test_notification_config_whatsapp_enabled_from_env() {
917        unsafe { env::remove_var("WHATSAPP_ENABLED") };
918        with_env_vars(&[("WHATSAPP_ENABLED", "true")], || {
919            let config = NotificationConfig::from_env();
920            assert!(config.whatsapp_enabled);
921        });
922    }
923
924    #[test]
925    #[serial]
926    fn test_notification_config_whatsapp_disabled_when_env_false() {
927        unsafe { env::remove_var("WHATSAPP_ENABLED") };
928        with_env_vars(&[("WHATSAPP_ENABLED", "false")], || {
929            let config = NotificationConfig::from_env();
930            assert!(!config.whatsapp_enabled);
931        });
932    }
933
934    #[test]
935    #[serial]
936    fn test_notification_config_whatsapp_disabled_when_env_unset() {
937        unsafe { env::remove_var("WHATSAPP_ENABLED") };
938        let config = NotificationConfig::from_env();
939        assert!(!config.whatsapp_enabled);
940    }
941
942    #[test]
943    #[serial]
944    fn test_notification_config_whatsapp_disabled_when_env_garbage() {
945        unsafe { env::remove_var("WHATSAPP_ENABLED") };
946        with_env_vars(&[("WHATSAPP_ENABLED", "yes-please")], || {
947            let config = NotificationConfig::from_env();
948            assert!(
949                !config.whatsapp_enabled,
950                "non-bool string must fall back to false"
951            );
952        });
953    }
954
955    #[test]
956    fn test_notification_config_with_whatsapp_enabled_builder() {
957        let config = NotificationConfig::new().with_whatsapp_enabled(true);
958        assert!(config.whatsapp_enabled);
959        let config2 = NotificationConfig::new().with_whatsapp_enabled(false);
960        assert!(!config2.whatsapp_enabled);
961    }
962
963    /// Helper to run env-based tests with clean env var state.
964    fn with_env_vars<F: FnOnce()>(vars: &[(&str, &str)], f: F) {
965        // Set vars
966        for (key, val) in vars {
967            unsafe { env::set_var(key, val) };
968        }
969        f();
970        // Clean up
971        for (key, _) in vars {
972            unsafe { env::remove_var(key) };
973        }
974    }
975
976    /// Helper to ensure env vars are clean before a test.
977    fn clean_mail_env() {
978        let keys = [
979            "MAIL_DRIVER",
980            "MAIL_FROM_ADDRESS",
981            "MAIL_FROM_NAME",
982            "MAIL_HOST",
983            "MAIL_PORT",
984            "MAIL_USERNAME",
985            "MAIL_PASSWORD",
986            "MAIL_ENCRYPTION",
987            "RESEND_API_KEY",
988        ];
989        for key in keys {
990            unsafe { env::remove_var(key) };
991        }
992    }
993
994    #[test]
995    #[serial]
996    fn test_mail_config_smtp_from_env() {
997        clean_mail_env();
998        with_env_vars(
999            &[
1000                ("MAIL_FROM_ADDRESS", "noreply@example.com"),
1001                ("MAIL_FROM_NAME", "Test App"),
1002                ("MAIL_HOST", "smtp.example.com"),
1003                ("MAIL_PORT", "465"),
1004                ("MAIL_USERNAME", "user@example.com"),
1005                ("MAIL_PASSWORD", "secret"),
1006                ("MAIL_ENCRYPTION", "tls"),
1007            ],
1008            || {
1009                let config = MailConfig::from_env().expect("should parse SMTP config");
1010                assert!(matches!(config.driver, MailDriver::Smtp));
1011                assert_eq!(config.from, "noreply@example.com");
1012                assert_eq!(config.from_name, Some("Test App".to_string()));
1013
1014                let smtp = config.smtp.as_ref().expect("smtp config present");
1015                assert_eq!(smtp.host, "smtp.example.com");
1016                assert_eq!(smtp.port, 465);
1017                assert_eq!(smtp.username, Some("user@example.com".to_string()));
1018                assert_eq!(smtp.password, Some("secret".to_string()));
1019                assert!(smtp.tls);
1020                assert!(config.resend.is_none());
1021            },
1022        );
1023    }
1024
1025    #[test]
1026    #[serial]
1027    fn test_mail_config_resend_from_env() {
1028        clean_mail_env();
1029        with_env_vars(
1030            &[
1031                ("MAIL_DRIVER", "resend"),
1032                ("MAIL_FROM_ADDRESS", "noreply@example.com"),
1033                ("MAIL_FROM_NAME", "Test App"),
1034                ("RESEND_API_KEY", "re_test_123456"),
1035            ],
1036            || {
1037                let config = MailConfig::from_env().expect("should parse Resend config");
1038                assert!(matches!(config.driver, MailDriver::Resend));
1039                assert_eq!(config.from, "noreply@example.com");
1040                assert_eq!(config.from_name, Some("Test App".to_string()));
1041
1042                let resend = config.resend.as_ref().expect("resend config present");
1043                assert_eq!(resend.api_key, "re_test_123456");
1044                assert!(config.smtp.is_none());
1045            },
1046        );
1047    }
1048
1049    #[test]
1050    #[serial]
1051    fn test_mail_config_default_driver() {
1052        clean_mail_env();
1053        with_env_vars(
1054            &[
1055                ("MAIL_FROM_ADDRESS", "noreply@example.com"),
1056                ("MAIL_HOST", "smtp.example.com"),
1057            ],
1058            || {
1059                let config = MailConfig::from_env().expect("should default to SMTP");
1060                assert!(matches!(config.driver, MailDriver::Smtp));
1061                assert_eq!(config.smtp.as_ref().unwrap().host, "smtp.example.com");
1062                assert_eq!(config.smtp.as_ref().unwrap().port, 587); // default port
1063            },
1064        );
1065    }
1066
1067    #[test]
1068    #[serial]
1069    fn test_mail_config_resend_missing_api_key() {
1070        clean_mail_env();
1071        with_env_vars(
1072            &[
1073                ("MAIL_DRIVER", "resend"),
1074                ("MAIL_FROM_ADDRESS", "noreply@example.com"),
1075            ],
1076            || {
1077                let config = MailConfig::from_env();
1078                assert!(
1079                    config.is_none(),
1080                    "should return None when RESEND_API_KEY missing"
1081                );
1082            },
1083        );
1084    }
1085
1086    #[test]
1087    fn test_resend_payload_serialization() {
1088        let payload = ResendEmailPayload {
1089            from: "sender@example.com".into(),
1090            to: vec!["recipient@example.com".into()],
1091            subject: "Test".into(),
1092            html: Some("<p>Hello</p>".into()),
1093            text: None,
1094            cc: vec![],
1095            bcc: vec![],
1096            reply_to: None,
1097            attachments: vec![],
1098        };
1099
1100        let json = serde_json::to_value(&payload).unwrap();
1101        assert_eq!(json["from"], "sender@example.com");
1102        assert_eq!(json["to"][0], "recipient@example.com");
1103        assert_eq!(json["subject"], "Test");
1104        assert_eq!(json["html"], "<p>Hello</p>");
1105        // skip_serializing_if fields should be absent
1106        assert!(json.get("text").is_none());
1107        assert!(json.get("cc").is_none());
1108        assert!(json.get("bcc").is_none());
1109        assert!(json.get("reply_to").is_none());
1110        assert!(json.get("attachments").is_none());
1111    }
1112
1113    #[test]
1114    fn test_resend_payload_text_fallback() {
1115        let payload = ResendEmailPayload {
1116            from: "sender@example.com".into(),
1117            to: vec!["recipient@example.com".into()],
1118            subject: "Test".into(),
1119            html: None,
1120            text: Some("Plain text body".into()),
1121            cc: vec!["cc@example.com".into()],
1122            bcc: vec!["bcc@example.com".into()],
1123            reply_to: Some("reply@example.com".into()),
1124            attachments: vec![],
1125        };
1126
1127        let json = serde_json::to_value(&payload).unwrap();
1128        assert!(json.get("html").is_none());
1129        assert_eq!(json["text"], "Plain text body");
1130        assert_eq!(json["cc"][0], "cc@example.com");
1131        assert_eq!(json["bcc"][0], "bcc@example.com");
1132        assert_eq!(json["reply_to"], "reply@example.com");
1133    }
1134
1135    #[test]
1136    fn test_resend_payload_no_attachments_omits_field() {
1137        // Regression guard: when attachments empty, the JSON payload has NO "attachments" key.
1138        let payload = ResendEmailPayload {
1139            from: "sender@example.com".into(),
1140            to: vec!["recipient@example.com".into()],
1141            subject: "Test".into(),
1142            html: Some("<p>Hello</p>".into()),
1143            text: None,
1144            cc: vec![],
1145            bcc: vec![],
1146            reply_to: None,
1147            attachments: vec![],
1148        };
1149        let json = serde_json::to_value(&payload).unwrap();
1150        assert!(
1151            json.get("attachments").is_none(),
1152            "Empty attachments must not appear in serialized payload (byte-identical-to-today guarantee)"
1153        );
1154    }
1155
1156    #[test]
1157    fn test_resend_payload_with_attachments_serializes_base64() {
1158        let payload = ResendEmailPayload {
1159            from: "sender@example.com".into(),
1160            to: vec!["recipient@example.com".into()],
1161            subject: "Test".into(),
1162            html: None,
1163            text: Some("body".into()),
1164            cc: vec![],
1165            bcc: vec![],
1166            reply_to: None,
1167            attachments: vec![ResendAttachment {
1168                filename: "hi.txt".into(),
1169                // "hello" -> base64 standard = "aGVsbG8="
1170                content: "aGVsbG8=".into(),
1171            }],
1172        };
1173        let json = serde_json::to_value(&payload).unwrap();
1174        assert_eq!(json["attachments"][0]["filename"], "hi.txt");
1175        assert_eq!(json["attachments"][0]["content"], "aGVsbG8=");
1176        assert_eq!(json["attachments"].as_array().unwrap().len(), 1);
1177    }
1178
1179    #[test]
1180    fn test_base64_encoding_uses_standard_alphabet() {
1181        use base64::Engine;
1182        // Verify a known fixture: "Many hands make light work."
1183        // Standard base64: "TWFueSBoYW5kcyBtYWtlIGxpZ2h0IHdvcmsu"
1184        // URL-safe would substitute / and + characters; standard is what Resend expects.
1185        let encoded =
1186            base64::engine::general_purpose::STANDARD.encode(b"Many hands make light work.");
1187        assert_eq!(encoded, "TWFueSBoYW5kcyBtYWtlIGxpZ2h0IHdvcmsu");
1188    }
1189
1190    #[test]
1191    fn test_send_whatsapp_disabled_returns_ok_without_calling_init() {
1192        // The behavior contract: when whatsapp_enabled is false, send_whatsapp must NOT
1193        // touch ferro_whatsapp at all. We verify this indirectly by checking that
1194        // NotificationConfig::default().whatsapp_enabled is false (already covered by
1195        // test_notification_config_default), and that the dispatcher gating is
1196        // observable through the public `whatsapp_enabled` field.
1197        //
1198        // A live integration test that sends a real WhatsApp message lives downstream
1199        // in gestiscilo-it Phase 120 (per ROADMAP success criterion #7).
1200        let config = NotificationConfig::default();
1201        assert!(
1202            !config.whatsapp_enabled,
1203            "Default whatsapp_enabled must be false so dispatch path is gated"
1204        );
1205    }
1206
1207    #[test]
1208    fn test_smtp_multipart_path_compiles_with_attachment() {
1209        // Smoke test — actual SMTP send is exercised by the Mailpit integration test in Plan 07.
1210        // Here we just construct a MailMessage with an attachment and verify the type compiles end-to-end.
1211        use crate::channels::MailMessage;
1212        let mail = MailMessage::new()
1213            .subject("Test")
1214            .body("Hello")
1215            .attachment("test.txt", "text/plain", b"hello".to_vec())
1216            .expect("under-limit attachment must succeed");
1217        assert_eq!(mail.attachments.len(), 1);
1218        assert_eq!(mail.attachments[0].content_type, "text/plain");
1219    }
1220
1221    #[test]
1222    fn test_inapp_to_database_message_object_data_flattens() {
1223        use crate::channels::{InAppMessage, InAppSeverity};
1224        let msg = InAppMessage::new("OrderShipped")
1225            .data(serde_json::json!({"order_id": 42, "tracking": "ABC"}))
1226            .severity(InAppSeverity::Success);
1227        let db = inapp_to_database_message(&msg);
1228        assert_eq!(db.notification_type, "OrderShipped");
1229        assert_eq!(db.get("order_id"), Some(&serde_json::json!(42)));
1230        assert_eq!(db.get("tracking"), Some(&serde_json::json!("ABC")));
1231        assert!(
1232            db.get("payload").is_none(),
1233            "object data must NOT be wrapped under 'payload'"
1234        );
1235    }
1236
1237    #[test]
1238    fn test_inapp_to_database_message_non_object_wraps_under_payload() {
1239        use crate::channels::InAppMessage;
1240        let msg = InAppMessage::new("Heartbeat").data(serde_json::json!("ping"));
1241        let db = inapp_to_database_message(&msg);
1242        assert_eq!(db.notification_type, "Heartbeat");
1243        assert_eq!(db.get("payload"), Some(&serde_json::json!("ping")));
1244    }
1245
1246    #[tokio::test]
1247    async fn test_send_database_calls_store_when_configured() {
1248        // Substantive behavior — store.store() is invoked with the dispatcher's wiring —
1249        // is best exercised at the consumer integration level (gestiscilo-it Phase 120).
1250        // Since CONFIG is a OnceLock global, we cannot inject a fresh store per test
1251        // without restructuring the dispatcher. Here we verify the *path* exists by
1252        // exercising the trait directly through the public builder shape.
1253        use crate::channels::DatabaseMessage;
1254        use crate::notifiable::DatabaseNotificationStore;
1255        use async_trait::async_trait;
1256        use std::sync::atomic::{AtomicUsize, Ordering};
1257
1258        struct CountingStore {
1259            calls: AtomicUsize,
1260        }
1261        #[async_trait]
1262        impl DatabaseNotificationStore for CountingStore {
1263            async fn store(
1264                &self,
1265                _: &str,
1266                _: &str,
1267                _: &str,
1268                _: &DatabaseMessage,
1269            ) -> Result<(), Error> {
1270                self.calls.fetch_add(1, Ordering::SeqCst);
1271                Ok(())
1272            }
1273            async fn mark_as_read(&self, _: &str) -> Result<(), Error> {
1274                Ok(())
1275            }
1276            async fn unread(&self, _: &str) -> Result<Vec<crate::StoredNotification>, Error> {
1277                Ok(vec![])
1278            }
1279        }
1280
1281        let store = Arc::new(CountingStore {
1282            calls: AtomicUsize::new(0),
1283        });
1284        let msg = DatabaseMessage::new("Test").data("k", "v");
1285        store
1286            .store("user_id", "User", &msg.notification_type, &msg)
1287            .await
1288            .unwrap();
1289        assert_eq!(store.calls.load(Ordering::SeqCst), 1);
1290    }
1291}