1use crate::endpoints::{Endpoint, EndpointConfig};
20use crate::notifications::{Key, ValidatedNotification};
21use crate::Error;
22use async_trait::async_trait;
23use tracing::{debug, error, info};
24use mail_send::mail_builder::MessageBuilder;
25use mail_send::SmtpClientBuilder;
26use serde::Deserialize;
27use std::any::Any;
28use std::collections::{HashMap, HashSet};
29use tokio::sync::{broadcast, watch};
30
31#[derive(Debug, Deserialize, PartialEq, Eq, Hash, Clone)]
33pub(crate) struct EmailConfigFile {
34 hostname: String,
35 port: i64,
36 username: String,
37 password: String,
38 #[serde(default)]
39 implicit_tls: bool,
40 #[serde(default)]
41 allow_invalid_certs: bool,
42 from: String,
43 to: Vec<String>,
44 subject: String,
45 notifications: Vec<String>,
46}
47
48#[derive(Debug, Clone)]
50pub struct EmailEndpoint {
51 hostname: String,
52 port: u16,
53 username: String,
54 password: String,
55 implicit_tls: bool,
56 allow_invalid_certs: bool,
57 from: String,
58 to: Vec<String>,
59 subject: String,
60 notifications: Vec<String>,
61}
62#[derive(Debug, Clone)]
63struct EmailInfo {
64 hostname: String,
65 port: u16,
66 username: String,
67 password: String,
68 implicit_tls: bool,
69 allow_invalid_certs: bool,
70 from: String,
71 to: Vec<String>,
72 subject: String,
73}
74
75#[typetag::deserialize(name = "email")]
76impl EndpointConfig for EmailConfigFile {
77 fn to_endpoint(&self) -> Result<Box<dyn Endpoint + Send>, Error> {
78 Ok(Box::new(EmailEndpoint::try_from(self)?))
79 }
80}
81
82impl TryFrom<&EmailConfigFile> for EmailEndpoint {
83 type Error = Error;
84
85 fn try_from(value: &EmailConfigFile) -> Result<Self, Self::Error> {
86 if !(value.port < u16::MAX as i64 && value.port > u16::MIN as i64) {
87 return Err(Error::InvalidPortNumber(value.port));
88 } else if value.to.is_empty() {
89 return Err(Error::InvalidEndpointConfiguration(
90 "Email configuration has no 'to' email address setup".to_string(),
91 ));
92 } else if value.notifications.is_empty() {
93 return Err(Error::InvalidEndpointConfiguration(
94 "Email configuration has no notifications setup".to_string(),
95 ));
96 }
97
98 Ok(Self {
99 hostname: value.hostname.clone(),
100 port: value.port as u16,
101 username: value.username.clone(),
102 password: value.password.clone(),
103 implicit_tls: value.implicit_tls,
104 allow_invalid_certs: value.allow_invalid_certs,
105 from: value.from.clone(),
106 to: value.to.clone(),
107 subject: value.subject.clone(),
108 notifications: value.notifications.clone(),
109 })
110 }
111}
112
113#[async_trait]
114impl Endpoint for EmailEndpoint {
115 async fn notify(
116 &self,
117 endpoint_rx: broadcast::Receiver<ValidatedNotification>,
118 shutdown: watch::Receiver<bool>,
119 ) -> Result<(), Error> {
120 info!("Setting up Endpoint: Email -> {}:{} from {} with subject {}", self.hostname.as_str(), self.port, self.from.as_str(), self.subject.as_str());
121
122 let email_info = EmailInfo {
123 hostname: self.hostname.clone(),
124 port: self.port,
125 username: self.username.clone(),
126 password: self.password.clone(),
127 implicit_tls: self.implicit_tls,
128 allow_invalid_certs: self.allow_invalid_certs,
129 from: self.from.clone(),
130 to: self.to.clone(),
131 subject: self.subject.clone(),
132 };
133
134 tokio::spawn(async move { send_emails(endpoint_rx, shutdown, email_info).await });
135
136 Ok(())
137 }
138
139 fn generate_keys(&self, hash_key: &Key) -> HashMap<String, HashSet<Key>> {
140 let keys: HashSet<Key> = self
141 .notifications
142 .iter()
143 .map(|notification_name| Key::generate(notification_name.as_str(), hash_key))
144 .collect();
145
146 let mut map = HashMap::new();
147 map.insert("".to_string(), keys);
148 map
149 }
150
151 fn as_any(&self) -> &dyn Any {
152 self
153 }
154}
155
156async fn send_emails(
157 endpoint_rx: broadcast::Receiver<ValidatedNotification>,
158 shutdown: watch::Receiver<bool>,
159 info: EmailInfo,
160) {
161 let mut rx = endpoint_rx.resubscribe();
162 let mut shutdown_rx = shutdown.clone();
163
164 loop {
165 let info = info.clone();
166 tokio::select! {
167 received = rx.recv() => {
168 if let Ok(message) = received {
169 debug!("Email endpoint received message");
170
171 tokio::spawn( async move {
172 let content = message.message().text();
173 let email = MessageBuilder::new()
174 .from(info.from.as_str())
175 .subject(info.subject.as_str())
176 .to(info.to.clone())
177 .text_body(content);
178
179 debug!("Connecting to SMTP: {}:{} as {}", info.hostname.as_str(), info.port, info.username.as_str());
180 let mut smpt_client = SmtpClientBuilder::new(info.hostname.as_str(), info.port)
181 .implicit_tls(info.implicit_tls)
182 .credentials((info.username.as_str(), info.password.as_str()));
183
184 if info.allow_invalid_certs {
185 smpt_client = smpt_client.allow_invalid_certs();
186 }
187
188 match smpt_client.connect().await {
189 Ok(mut client) => {
190 match client.send(email).await {
191 Ok(_) => debug!("Email sent successfully"),
192 Err(e) => error!("Unable to connect to smtp server: {}", e),
193 }
194 }
195 Err(e) => error!("Unable to send email: {}", e)
196 }
197 }).await.unwrap();
198
199 }
200 }
201
202 _ = shutdown_rx.changed() => {
203 break;
204 }
205 }
206 }
207}