use crate::event_keys::EventKey;
use anyhow::Result as AnyhowResult;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
#[cfg(feature = "worker-v0-5")]
use worker::{Env, Request};
#[cfg(feature = "worker-v0-4")]
use worker_v4::{Env, Request};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum NotificationType {
Email,
Push,
Sms,
Web,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Recipient {
pub user_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub email: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub phone_number: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub push_token: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Priority {
High,
Normal,
Low,
}
fn default_priority() -> Priority {
Priority::Normal
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NotificationEvent {
#[serde(rename = "type")]
pub key: EventKey,
pub recipient: Recipient,
#[serde(rename = "notification_type")]
pub notification_type: Vec<NotificationType>,
#[serde(default, alias = "payloads", alias = "context")]
pub payloads: Vec<JsonValue>,
#[serde(default = "default_priority")]
pub priority: Priority,
}
impl NotificationEvent {
pub fn new(
key: EventKey,
recipient: Recipient,
notification_type: Vec<NotificationType>,
payloads: Vec<JsonValue>,
priority: Option<Priority>,
) -> Self {
NotificationEvent {
key,
recipient,
notification_type,
payloads,
priority: priority.unwrap_or_else(default_priority),
}
}
pub fn validate(&self) -> Result<(), String> {
if self.notification_type.is_empty() {
return Err("At least one notification type is required".to_string());
}
for ty in &self.notification_type {
match ty {
NotificationType::Email => {
if self.recipient.email.is_none() {
return Err("Email notification requires an email address".to_string());
}
}
NotificationType::Sms => {
if self.recipient.phone_number.is_none() {
return Err("SMS notification requires a phone number".to_string());
}
}
NotificationType::Push => {
if self.recipient.push_token.is_none() {
return Err("Push notification requires a push token".to_string());
}
}
NotificationType::Web => {}
}
}
Ok(())
}
pub fn has_notification(&self, target: NotificationType) -> bool {
self.notification_type.contains(&target)
}
}
pub async fn send_notification(
notification: NotificationEvent,
notifications_service_url: String,
req: &Request,
env: &Env,
) -> AnyhowResult<Option<JsonValue>> {
use utils::*;
#[cfg(feature = "worker-v0-5")]
use worker::Method;
#[cfg(feature = "worker-v0-4")]
use worker_v4::Method;
let url = notifications_service_url + "/dispatch";
let jwt = utils::extract_jwt_from_headers(req)?;
let auth_header = AuthorizationHeaders::Bearer(jwt.as_str());
let method = Method::Post;
let body = serde_json::to_value(¬ification)?;
let content_type = ContentTypes::Json;
let request_type = RequestType::Internal("notifications", &env);
let rough_result = request::<JsonValue>(
url.as_str(),
method,
Some(body),
Some(auth_header),
content_type,
request_type,
)
.await?;
Ok(rough_result)
}
mod utils {
use anyhow::{Context, Result as AnyhowResult};
use base64::{Engine, prelude::BASE64_STANDARD};
use core::fmt;
use serde::de::DeserializeOwned;
use serde_json::{Value, to_string};
#[cfg(feature = "worker-v0-5")]
use worker::{Env, Fetch, Headers, Method, Request, RequestInit, wasm_bindgen::JsValue};
#[cfg(feature = "worker-v0-4")]
use worker_v4::{Env, Fetch, Headers, Method, Request, RequestInit, wasm_bindgen::JsValue};
pub fn extract_jwt_from_headers(req: &Request) -> AnyhowResult<String> {
let auth_header = req
.headers()
.get("Authorization")
.context("Failed to retrieve Authorization header")?;
let token = auth_header
.context("Authorization header missing")?
.strip_prefix("Bearer ")
.context("Authorization header must start with 'Bearer '")?
.trim()
.to_string();
Ok(token)
}
#[allow(dead_code)]
#[derive(Debug)]
pub enum AuthorizationHeaders<'a> {
Bearer(&'a str),
Basic(&'a str, &'a str),
}
#[allow(dead_code)]
#[derive(Debug)]
pub enum ContentTypes {
Json,
XWWWFormUrlEncoded,
}
#[allow(dead_code)]
pub enum RequestType<'a> {
Internal(&'a str, &'a Env),
External,
}
impl<'a> fmt::Debug for RequestType<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RequestType::Internal(service_name, _ctx) => f
.debug_tuple("Internal")
.field(service_name)
.field(&"<RouteContext>")
.finish(),
RequestType::External => f.debug_tuple("External").finish(),
}
}
}
pub async fn request<'a, R: DeserializeOwned>(
url: &str,
method: Method,
body: Option<Value>,
auth_header: Option<AuthorizationHeaders<'a>>,
content_type: ContentTypes,
request_type: RequestType<'a>,
) -> AnyhowResult<Option<R>> {
fn append_header(headers: &mut Headers, key: &str, value: &str) -> AnyhowResult<()> {
headers
.append(key, value)
.context(format!("Failed to append {} header", key))?;
Ok(())
}
let body = body
.map(|val| to_string(&val))
.transpose()?
.map(|s| JsValue::from_str(&s));
let mut headers = Headers::default();
if let Some(auth) = auth_header {
let auth_value = match auth {
AuthorizationHeaders::Bearer(token) => format!("Bearer {}", token),
AuthorizationHeaders::Basic(user, pass) => {
let credentials = BASE64_STANDARD.encode(format!("{}:{}", user, pass));
format!("Basic {}", credentials)
}
};
append_header(&mut headers, "Authorization", &auth_value)?;
}
let content_type_str = match content_type {
ContentTypes::Json => "application/json",
ContentTypes::XWWWFormUrlEncoded => "application/x-www-form-urlencoded",
};
append_header(&mut headers, "Content-Type", content_type_str)?;
let req = Request::new_with_init(
url,
&RequestInit {
method,
body,
headers,
..Default::default()
},
)
.context("Failed to create request")?;
let mut resp = match request_type {
RequestType::Internal(service_name, env) => {
let fetcher = env
.service(service_name)
.context("Failed to get internal service binding")?;
fetcher.fetch_request(req).await
}
RequestType::External => Fetch::Request(req).send().await,
}
.context("Failed to send request")?;
let status = resp.status_code();
if !(200..=299).contains(&status) {
return Err(anyhow::anyhow!("Request failed with status: {}", status));
}
let text = resp.text().await.context("Failed to read response text")?;
if text.is_empty() {
Ok(None)
} else {
let deserialized = serde_json::from_str(&text)
.with_context(|| format!("Failed to deserialize response (status {}): {}", status, text))?;
Ok(Some(deserialized))
}
}
}
#[cfg(test)]
mod tests {
use crate::notifications::{NotificationEvent, NotificationType, Priority};
#[test]
fn test_fluvio_deserialization() {
let json = r#"{
"type": "sync.disputer.disputes.created",
"recipient": { "user_id": "user123" },
"notification_type": ["web"],
"payloads": [
{
"amount": 130.78,
"charge": "ch_3QDoFsJkaap6UV2A0XFo8B8r"
}
]
}"#;
let event: NotificationEvent = serde_json::from_str(json).unwrap();
assert_eq!(event.key.as_str(), "sync.disputer.disputes.created");
assert_eq!(event.payloads.len(), 1);
assert_eq!(event.recipient.user_id, "user123");
}
#[test]
fn test_modern_deserialization() {
let json = r#"{
"type": "account.welcome",
"recipient": {"user_id": "user123", "email": "user@example.com"},
"notification_type": ["email"],
"context": [{ "name": "John" }],
"priority": "high"
}"#;
let event: NotificationEvent = serde_json::from_str(json).unwrap();
assert_eq!(event.key.as_str(), "account.welcome");
assert_eq!(event.recipient.user_id, "user123");
assert_eq!(event.notification_type, vec![NotificationType::Email]);
assert_eq!(event.payloads.len(), 1);
assert_eq!(event.priority, Priority::High);
}
}