Skip to main content

safeapp_notifier/
notifications.rs

1use crate::event_keys::EventKey;
2use anyhow::Result as AnyhowResult;
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5#[cfg(feature = "worker-v0-5")]
6use worker::{Env, Request};
7#[cfg(feature = "worker-v0-4")]
8use worker_v4::{Env, Request};
9
10#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
11#[serde(rename_all = "lowercase")]
12pub enum NotificationType {
13    Email,
14    Push,
15    Sms,
16    Web,
17}
18
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
20pub struct Recipient {
21    pub user_id: String,
22    #[serde(skip_serializing_if = "Option::is_none")]
23    pub email: Option<String>,
24    #[serde(skip_serializing_if = "Option::is_none")]
25    pub phone_number: Option<String>,
26    #[serde(skip_serializing_if = "Option::is_none")]
27    pub push_token: Option<String>,
28}
29
30#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31#[serde(rename_all = "lowercase")]
32pub enum Priority {
33    High,
34    Normal,
35    Low,
36}
37
38// Default functions
39fn default_priority() -> Priority {
40    Priority::Normal
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct NotificationEvent {
45    #[serde(rename = "type")]
46    pub key: EventKey,
47    pub recipient: Recipient,
48    #[serde(rename = "notification_type")]
49    pub notification_type: Vec<NotificationType>,
50    #[serde(default, alias = "payloads", alias = "context")]
51    pub payloads: Vec<JsonValue>,
52    #[serde(default = "default_priority")]
53    pub priority: Priority,
54}
55
56impl NotificationEvent {
57    pub fn new(
58        key: EventKey,
59        recipient: Recipient,
60        notification_type: Vec<NotificationType>,
61        payloads: Vec<JsonValue>,
62        priority: Option<Priority>,
63    ) -> Self {
64        NotificationEvent {
65            key,
66            recipient,
67            notification_type,
68            payloads,
69            priority: priority.unwrap_or_else(default_priority),
70        }
71    }
72
73    pub fn validate(&self) -> Result<(), String> {
74        if self.notification_type.is_empty() {
75            return Err("At least one notification type is required".to_string());
76        }
77
78        for ty in &self.notification_type {
79            match ty {
80                NotificationType::Email => {
81                    if self.recipient.email.is_none() {
82                        return Err("Email notification requires an email address".to_string());
83                    }
84                }
85                NotificationType::Sms => {
86                    if self.recipient.phone_number.is_none() {
87                        return Err("SMS notification requires a phone number".to_string());
88                    }
89                }
90                NotificationType::Push => {
91                    if self.recipient.push_token.is_none() {
92                        return Err("Push notification requires a push token".to_string());
93                    }
94                }
95                NotificationType::Web => {}
96            }
97        }
98
99        Ok(())
100    }
101
102    pub fn has_notification(&self, target: NotificationType) -> bool {
103        self.notification_type.contains(&target)
104    }
105}
106
107pub async fn send_notification(
108    notification: NotificationEvent,
109    notifications_service_url: String,
110    req: &Request,
111    env: &Env,
112) -> AnyhowResult<Option<JsonValue>> {
113    use utils::*;
114    #[cfg(feature = "worker-v0-5")]
115    use worker::Method;
116    #[cfg(feature = "worker-v0-4")]
117    use worker_v4::Method;
118
119    let url = notifications_service_url + "/dispatch";
120    let jwt = utils::extract_jwt_from_headers(req)?;
121
122    let auth_header = AuthorizationHeaders::Bearer(jwt.as_str());
123    let method = Method::Post;
124    let body = serde_json::to_value(&notification)?;
125    let content_type = ContentTypes::Json;
126    let request_type = RequestType::Internal("notifications", &env);
127    let rough_result = request::<JsonValue>(
128        url.as_str(),
129        method,
130        Some(body),
131        Some(auth_header),
132        content_type,
133        request_type,
134    )
135    .await?;
136
137    Ok(rough_result)
138}
139
140mod utils {
141    use anyhow::{Context, Result as AnyhowResult};
142    use base64::{Engine, prelude::BASE64_STANDARD};
143    use core::fmt;
144    use serde::de::DeserializeOwned;
145    use serde_json::{Value, to_string};
146    #[cfg(feature = "worker-v0-5")]
147    use worker::{Env, Fetch, Headers, Method, Request, RequestInit, wasm_bindgen::JsValue};
148    #[cfg(feature = "worker-v0-4")]
149    use worker_v4::{Env, Fetch, Headers, Method, Request, RequestInit, wasm_bindgen::JsValue};
150
151    pub fn extract_jwt_from_headers(req: &Request) -> AnyhowResult<String> {
152        let auth_header = req
153            .headers()
154            .get("Authorization")
155            .context("Failed to retrieve Authorization header")?;
156
157        let token = auth_header
158            .context("Authorization header missing")?
159            .strip_prefix("Bearer ")
160            .context("Authorization header must start with 'Bearer '")?
161            .trim()
162            .to_string();
163
164        Ok(token)
165    }
166
167    /// Authorization header options for HTTP requests.
168    #[allow(dead_code)]
169    #[derive(Debug)]
170    pub enum AuthorizationHeaders<'a> {
171        Bearer(&'a str),
172        Basic(&'a str, &'a str),
173    }
174
175    /// Supported content types for request bodies.
176    #[allow(dead_code)]
177    #[derive(Debug)]
178    pub enum ContentTypes {
179        Json,
180        XWWWFormUrlEncoded,
181    }
182
183    /// Type of request: internal service call or external HTTP request.
184    #[allow(dead_code)]
185    pub enum RequestType<'a> {
186        Internal(&'a str, &'a Env),
187        External,
188    }
189
190    impl<'a> fmt::Debug for RequestType<'a> {
191        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
192            match self {
193                RequestType::Internal(service_name, _ctx) => f
194                    .debug_tuple("Internal")
195                    .field(service_name)
196                    .field(&"<RouteContext>")
197                    .finish(),
198                RequestType::External => f.debug_tuple("External").finish(),
199            }
200        }
201    }
202
203    pub async fn request<'a, R: DeserializeOwned>(
204        url: &str,
205        method: Method,
206        body: Option<Value>,
207        auth_header: Option<AuthorizationHeaders<'a>>,
208        content_type: ContentTypes,
209        request_type: RequestType<'a>,
210    ) -> AnyhowResult<Option<R>> {
211        // Helper to append headers with error context
212        fn append_header(headers: &mut Headers, key: &str, value: &str) -> AnyhowResult<()> {
213            headers
214                .append(key, value)
215                .context(format!("Failed to append {} header", key))?;
216            Ok(())
217        }
218
219        // Prepare the body efficiently
220        let body = body
221            .map(|val| to_string(&val))
222            .transpose()?
223            .map(|s| JsValue::from_str(&s));
224
225        // Build headers
226        let mut headers = Headers::default();
227        if let Some(auth) = auth_header {
228            let auth_value = match auth {
229                AuthorizationHeaders::Bearer(token) => format!("Bearer {}", token),
230                AuthorizationHeaders::Basic(user, pass) => {
231                    let credentials = BASE64_STANDARD.encode(format!("{}:{}", user, pass));
232                    format!("Basic {}", credentials)
233                }
234            };
235            append_header(&mut headers, "Authorization", &auth_value)?;
236        }
237
238        let content_type_str = match content_type {
239            ContentTypes::Json => "application/json",
240            ContentTypes::XWWWFormUrlEncoded => "application/x-www-form-urlencoded",
241        };
242        append_header(&mut headers, "Content-Type", content_type_str)?;
243
244        // Construct the request
245        let req = Request::new_with_init(
246            url,
247            &RequestInit {
248                method,
249                body,
250                headers,
251                ..Default::default()
252            },
253        )
254        .context("Failed to create request")?;
255
256        // Send the request based on type
257        let mut resp = match request_type {
258            RequestType::Internal(service_name, env) => {
259                let fetcher = env
260                    .service(service_name)
261                    .context("Failed to get internal service binding")?;
262                fetcher.fetch_request(req).await
263            }
264            RequestType::External => Fetch::Request(req).send().await,
265        }
266        .context("Failed to send request")?;
267
268        // Optional: Check status code
269        let status = resp.status_code();
270        if !(200..=299).contains(&status) {
271            return Err(anyhow::anyhow!("Request failed with status: {}", status));
272        }
273
274        // Process the response
275        let text = resp.text().await.context("Failed to read response text")?;
276        if text.is_empty() {
277            Ok(None)
278        } else {
279            let deserialized = serde_json::from_str(&text)
280                .with_context(|| format!("Failed to deserialize response (status {}): {}", status, text))?;
281            Ok(Some(deserialized))
282        }
283    }
284}
285
286// Example usage
287#[cfg(test)]
288mod tests {
289    use crate::notifications::{NotificationEvent, NotificationType, Priority};
290
291    #[test]
292    fn test_fluvio_deserialization() {
293        // let json = r#"{
294        //     "type": "sync.disputer.disputes.created",
295        //     "payloads": [
296        //         {
297        //             "amount": 130.78,
298        //             "charge": "ch_3QDoFsJkaap6UV2A0XFo8B8r"
299        //         }
300        //     ]
301        // }"#;
302                let json = r#"{
303            "type": "sync.disputer.disputes.created",
304            "recipient": { "user_id": "user123" },
305            "notification_type": ["web"],
306            "payloads": [
307                {
308                    "amount": 130.78,
309                    "charge": "ch_3QDoFsJkaap6UV2A0XFo8B8r"
310                }
311            ]
312        }"#;
313        let event: NotificationEvent = serde_json::from_str(json).unwrap();
314        assert_eq!(event.key.as_str(), "sync.disputer.disputes.created");
315        assert_eq!(event.payloads.len(), 1);
316        assert_eq!(event.recipient.user_id, "user123");
317    }
318
319    #[test]
320    fn test_modern_deserialization() {
321        // let json = r#"{
322        //     "type": "account.welcome",
323        //     "recipient": {"user_id": "user123", "email": "user@example.com"},
324        //     "notification_type": ["email"],
325        //     "context": {"name": "John"},
326        //     "priority": "high"
327        // }"#;
328        let json = r#"{
329            "type": "account.welcome",
330            "recipient": {"user_id": "user123", "email": "user@example.com"},
331            "notification_type": ["email"],
332            "context": [{ "name": "John" }],
333            "priority": "high"
334        }"#;
335        let event: NotificationEvent = serde_json::from_str(json).unwrap();
336        assert_eq!(event.key.as_str(), "account.welcome");
337        assert_eq!(event.recipient.user_id, "user123");
338        assert_eq!(event.notification_type, vec![NotificationType::Email]);
339        assert_eq!(event.payloads.len(), 1);
340        assert_eq!(event.priority, Priority::High);
341    }
342}