1use crate::event_keys::EventKey;
2use anyhow::Result as AnyhowResult;
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5#[cfg(feature = "worker-v0-5")]
6use worker::{Env, Request};
7#[cfg(feature = "worker-v0-4")]
8use worker_v4::{Env, Request};
9
10#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "lowercase")]
12pub enum NotificationType {
13 Email,
14 Push,
15 Sms,
16 Web,
17}
18
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
20pub struct Recipient {
21 pub user_id: String,
22 #[serde(skip_serializing_if = "Option::is_none")]
23 pub email: Option<String>,
24 #[serde(skip_serializing_if = "Option::is_none")]
25 pub phone_number: Option<String>,
26 #[serde(skip_serializing_if = "Option::is_none")]
27 pub push_token: Option<String>,
28}
29
30#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "lowercase")]
32pub enum Priority {
33 High,
34 Normal,
35 Low,
36}
37
38fn default_priority() -> Priority {
40 Priority::Normal
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct NotificationEvent {
45 #[serde(rename = "type")]
46 pub key: EventKey,
47 pub recipient: Recipient,
48 #[serde(rename = "notification_type")]
49 pub notification_type: Vec<NotificationType>,
50 #[serde(default, alias = "payloads", alias = "context")]
51 pub payloads: Vec<JsonValue>,
52 #[serde(default = "default_priority")]
53 pub priority: Priority,
54}
55
56impl NotificationEvent {
57 pub fn new(
58 key: EventKey,
59 recipient: Recipient,
60 notification_type: Vec<NotificationType>,
61 payloads: Vec<JsonValue>,
62 priority: Option<Priority>,
63 ) -> Self {
64 NotificationEvent {
65 key,
66 recipient,
67 notification_type,
68 payloads,
69 priority: priority.unwrap_or_else(default_priority),
70 }
71 }
72
73 pub fn validate(&self) -> Result<(), String> {
74 if self.notification_type.is_empty() {
75 return Err("At least one notification type is required".to_string());
76 }
77
78 for ty in &self.notification_type {
79 match ty {
80 NotificationType::Email => {
81 if self.recipient.email.is_none() {
82 return Err("Email notification requires an email address".to_string());
83 }
84 }
85 NotificationType::Sms => {
86 if self.recipient.phone_number.is_none() {
87 return Err("SMS notification requires a phone number".to_string());
88 }
89 }
90 NotificationType::Push => {
91 if self.recipient.push_token.is_none() {
92 return Err("Push notification requires a push token".to_string());
93 }
94 }
95 NotificationType::Web => {}
96 }
97 }
98
99 Ok(())
100 }
101
102 pub fn has_notification(&self, target: NotificationType) -> bool {
103 self.notification_type.contains(&target)
104 }
105}
106
107pub async fn send_notification(
108 notification: NotificationEvent,
109 notifications_service_url: String,
110 req: &Request,
111 env: &Env,
112) -> AnyhowResult<Option<JsonValue>> {
113 use utils::*;
114 #[cfg(feature = "worker-v0-5")]
115 use worker::Method;
116 #[cfg(feature = "worker-v0-4")]
117 use worker_v4::Method;
118
119 let url = notifications_service_url + "/dispatch";
120 let jwt = utils::extract_jwt_from_headers(req)?;
121
122 let auth_header = AuthorizationHeaders::Bearer(jwt.as_str());
123 let method = Method::Post;
124 let body = serde_json::to_value(¬ification)?;
125 let content_type = ContentTypes::Json;
126 let request_type = RequestType::Internal("notifications", &env);
127 let rough_result = request::<JsonValue>(
128 url.as_str(),
129 method,
130 Some(body),
131 Some(auth_header),
132 content_type,
133 request_type,
134 )
135 .await?;
136
137 Ok(rough_result)
138}
139
140mod utils {
141 use anyhow::{Context, Result as AnyhowResult};
142 use base64::{Engine, prelude::BASE64_STANDARD};
143 use core::fmt;
144 use serde::de::DeserializeOwned;
145 use serde_json::{Value, to_string};
146 #[cfg(feature = "worker-v0-5")]
147 use worker::{Env, Fetch, Headers, Method, Request, RequestInit, wasm_bindgen::JsValue};
148 #[cfg(feature = "worker-v0-4")]
149 use worker_v4::{Env, Fetch, Headers, Method, Request, RequestInit, wasm_bindgen::JsValue};
150
151 pub fn extract_jwt_from_headers(req: &Request) -> AnyhowResult<String> {
152 let auth_header = req
153 .headers()
154 .get("Authorization")
155 .context("Failed to retrieve Authorization header")?;
156
157 let token = auth_header
158 .context("Authorization header missing")?
159 .strip_prefix("Bearer ")
160 .context("Authorization header must start with 'Bearer '")?
161 .trim()
162 .to_string();
163
164 Ok(token)
165 }
166
167 #[allow(dead_code)]
169 #[derive(Debug)]
170 pub enum AuthorizationHeaders<'a> {
171 Bearer(&'a str),
172 Basic(&'a str, &'a str),
173 }
174
175 #[allow(dead_code)]
177 #[derive(Debug)]
178 pub enum ContentTypes {
179 Json,
180 XWWWFormUrlEncoded,
181 }
182
183 #[allow(dead_code)]
185 pub enum RequestType<'a> {
186 Internal(&'a str, &'a Env),
187 External,
188 }
189
190 impl<'a> fmt::Debug for RequestType<'a> {
191 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
192 match self {
193 RequestType::Internal(service_name, _ctx) => f
194 .debug_tuple("Internal")
195 .field(service_name)
196 .field(&"<RouteContext>")
197 .finish(),
198 RequestType::External => f.debug_tuple("External").finish(),
199 }
200 }
201 }
202
203 pub async fn request<'a, R: DeserializeOwned>(
204 url: &str,
205 method: Method,
206 body: Option<Value>,
207 auth_header: Option<AuthorizationHeaders<'a>>,
208 content_type: ContentTypes,
209 request_type: RequestType<'a>,
210 ) -> AnyhowResult<Option<R>> {
211 fn append_header(headers: &mut Headers, key: &str, value: &str) -> AnyhowResult<()> {
213 headers
214 .append(key, value)
215 .context(format!("Failed to append {} header", key))?;
216 Ok(())
217 }
218
219 let body = body
221 .map(|val| to_string(&val))
222 .transpose()?
223 .map(|s| JsValue::from_str(&s));
224
225 let mut headers = Headers::default();
227 if let Some(auth) = auth_header {
228 let auth_value = match auth {
229 AuthorizationHeaders::Bearer(token) => format!("Bearer {}", token),
230 AuthorizationHeaders::Basic(user, pass) => {
231 let credentials = BASE64_STANDARD.encode(format!("{}:{}", user, pass));
232 format!("Basic {}", credentials)
233 }
234 };
235 append_header(&mut headers, "Authorization", &auth_value)?;
236 }
237
238 let content_type_str = match content_type {
239 ContentTypes::Json => "application/json",
240 ContentTypes::XWWWFormUrlEncoded => "application/x-www-form-urlencoded",
241 };
242 append_header(&mut headers, "Content-Type", content_type_str)?;
243
244 let req = Request::new_with_init(
246 url,
247 &RequestInit {
248 method,
249 body,
250 headers,
251 ..Default::default()
252 },
253 )
254 .context("Failed to create request")?;
255
256 let mut resp = match request_type {
258 RequestType::Internal(service_name, env) => {
259 let fetcher = env
260 .service(service_name)
261 .context("Failed to get internal service binding")?;
262 fetcher.fetch_request(req).await
263 }
264 RequestType::External => Fetch::Request(req).send().await,
265 }
266 .context("Failed to send request")?;
267
268 let status = resp.status_code();
270 if !(200..=299).contains(&status) {
271 return Err(anyhow::anyhow!("Request failed with status: {}", status));
272 }
273
274 let text = resp.text().await.context("Failed to read response text")?;
276 if text.is_empty() {
277 Ok(None)
278 } else {
279 let deserialized = serde_json::from_str(&text)
280 .with_context(|| format!("Failed to deserialize response (status {}): {}", status, text))?;
281 Ok(Some(deserialized))
282 }
283 }
284}
285
286#[cfg(test)]
288mod tests {
289 use crate::notifications::{NotificationEvent, NotificationType, Priority};
290
291 #[test]
292 fn test_fluvio_deserialization() {
293 let json = r#"{
303 "type": "sync.disputer.disputes.created",
304 "recipient": { "user_id": "user123" },
305 "notification_type": ["web"],
306 "payloads": [
307 {
308 "amount": 130.78,
309 "charge": "ch_3QDoFsJkaap6UV2A0XFo8B8r"
310 }
311 ]
312 }"#;
313 let event: NotificationEvent = serde_json::from_str(json).unwrap();
314 assert_eq!(event.key.as_str(), "sync.disputer.disputes.created");
315 assert_eq!(event.payloads.len(), 1);
316 assert_eq!(event.recipient.user_id, "user123");
317 }
318
319 #[test]
320 fn test_modern_deserialization() {
321 let json = r#"{
329 "type": "account.welcome",
330 "recipient": {"user_id": "user123", "email": "user@example.com"},
331 "notification_type": ["email"],
332 "context": [{ "name": "John" }],
333 "priority": "high"
334 }"#;
335 let event: NotificationEvent = serde_json::from_str(json).unwrap();
336 assert_eq!(event.key.as_str(), "account.welcome");
337 assert_eq!(event.recipient.user_id, "user123");
338 assert_eq!(event.notification_type, vec![NotificationType::Email]);
339 assert_eq!(event.payloads.len(), 1);
340 assert_eq!(event.priority, Priority::High);
341 }
342}