use serde::{Deserialize, Serialize};
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{info, warn, error, instrument};
#[derive(Debug, Clone)]
pub struct NotificationConfig {
pub supabase_url: String,
pub supabase_service_key: String,
pub dynamo_table: String,
pub token_cache_ttl: i64,
}
impl NotificationConfig {
pub fn from_env() -> Result<Self, NotificationError> {
Ok(Self {
supabase_url: std::env::var("SUPABASE_URL")
.map_err(|_| NotificationError::MissingConfig("SUPABASE_URL".into()))?,
supabase_service_key: std::env::var("SUPABASE_SERVICE_ROLE_KEY")
.map_err(|_| NotificationError::MissingConfig("SUPABASE_SERVICE_ROLE_KEY".into()))?,
dynamo_table: std::env::var("DYNAMO_TABLE")
.unwrap_or_else(|_| "loop-agent-state".into()),
token_cache_ttl: 86400 * 7, })
}
}
pub struct NotificationService {
config: NotificationConfig,
http_client: reqwest::Client,
dynamo_client: aws_sdk_dynamodb::Client,
}
impl NotificationService {
pub async fn new(config: NotificationConfig) -> Result<Self, NotificationError> {
let aws_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
let dynamo_client = aws_sdk_dynamodb::Client::new(&aws_config);
let http_client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.map_err(|e| NotificationError::InitError(e.to_string()))?;
Ok(Self {
config,
http_client,
dynamo_client,
})
}
#[instrument(skip(self))]
pub async fn send(
&self,
user_pubkey: &str,
notification: &PushNotification,
) -> Result<NotificationResult, NotificationError> {
let push_token = match self.get_cached_token(user_pubkey).await? {
Some(token) => {
info!(user = %user_pubkey, "Push token found in cache");
token
}
None => {
info!(user = %user_pubkey, "Cache miss, querying Supabase");
let token = self.fetch_token_from_supabase(user_pubkey).await?;
if let Some(ref t) = token {
self.cache_token(user_pubkey, t).await?;
}
token.ok_or(NotificationError::NoToken)?
}
};
self.send_via_supabase(&push_token, user_pubkey, notification).await
}
pub async fn send_capture_notification(
&self,
user_pubkey: &str,
cred_amount: f64,
merchant_name: &str,
) -> Result<NotificationResult, NotificationError> {
let notification = PushNotification {
title: "Cred Captured! 🎉".into(),
body: format!(
"You earned {:.2} Cred at {}. Tap to see it grow.",
cred_amount,
merchant_name
),
data: Some(NotificationData {
notification_type: "capture".into(),
cred_amount: Some(cred_amount),
merchant_name: Some(merchant_name.into()),
..Default::default()
}),
sound: Some("default".into()),
priority: NotificationPriority::High,
badge: None,
};
self.send(user_pubkey, ¬ification).await
}
pub async fn send_stake_notification(
&self,
user_pubkey: &str,
cred_amount: f64,
duration_days: u16,
apy: f64,
) -> Result<NotificationResult, NotificationError> {
let notification = PushNotification {
title: "Auto-Staked 📈".into(),
body: format!(
"{:.2} Cred staked for {} days at {:.1}% APY",
cred_amount, duration_days, apy
),
data: Some(NotificationData {
notification_type: "stake".into(),
cred_amount: Some(cred_amount),
..Default::default()
}),
sound: None, priority: NotificationPriority::Normal,
badge: None,
};
self.send(user_pubkey, ¬ification).await
}
pub async fn send_unlock_notification(
&self,
user_pubkey: &str,
cred_amount: f64,
yield_amount: f64,
hours_until: u16,
) -> Result<NotificationResult, NotificationError> {
let notification = PushNotification {
title: "Position Unlocking Soon ⏰".into(),
body: format!(
"Your {:.2} Cred stake unlocks in {} hours with {:.2} Cred yield!",
cred_amount, hours_until, yield_amount
),
data: Some(NotificationData {
notification_type: "unlock".into(),
cred_amount: Some(cred_amount),
..Default::default()
}),
sound: Some("default".into()),
priority: NotificationPriority::High,
badge: None,
};
self.send(user_pubkey, ¬ification).await
}
async fn get_cached_token(&self, user_pubkey: &str) -> Result<Option<String>, NotificationError> {
use aws_sdk_dynamodb::types::AttributeValue;
let result = self.dynamo_client
.get_item()
.table_name(&self.config.dynamo_table)
.key("pk", AttributeValue::S(format!("USER#{}", user_pubkey)))
.key("sk", AttributeValue::S("PUSH_TOKEN".into()))
.send()
.await
.map_err(|e| NotificationError::DynamoError(e.to_string()))?;
if let Some(item) = result.item {
if let Some(AttributeValue::N(ttl_str)) = item.get("ttl") {
let ttl: i64 = ttl_str.parse().unwrap_or(0);
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
if ttl < now {
return Ok(None);
}
}
if let Some(AttributeValue::S(token)) = item.get("push_token") {
return Ok(Some(token.clone()));
}
}
Ok(None)
}
async fn cache_token(&self, user_pubkey: &str, token: &str) -> Result<(), NotificationError> {
use aws_sdk_dynamodb::types::AttributeValue;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
let ttl = now + self.config.token_cache_ttl;
self.dynamo_client
.put_item()
.table_name(&self.config.dynamo_table)
.item("pk", AttributeValue::S(format!("USER#{}", user_pubkey)))
.item("sk", AttributeValue::S("PUSH_TOKEN".into()))
.item("push_token", AttributeValue::S(token.into()))
.item("cached_at", AttributeValue::N(now.to_string()))
.item("ttl", AttributeValue::N(ttl.to_string()))
.send()
.await
.map_err(|e| NotificationError::DynamoError(e.to_string()))?;
info!(user = %user_pubkey, "Push token cached in DynamoDB");
Ok(())
}
async fn fetch_token_from_supabase(&self, user_pubkey: &str) -> Result<Option<String>, NotificationError> {
let url = format!(
"{}/rest/v1/profiles?select=push_token&wallet_address=eq.{}",
self.config.supabase_url,
user_pubkey
);
let response = self.http_client
.get(&url)
.header("apikey", &self.config.supabase_service_key)
.header("Authorization", format!("Bearer {}", self.config.supabase_service_key))
.send()
.await
.map_err(|e| NotificationError::SupabaseError(e.to_string()))?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
error!(status = %status, body = %body, "Supabase query failed");
return Err(NotificationError::SupabaseError(format!("HTTP {}: {}", status, body)));
}
#[derive(Deserialize)]
struct ProfileRow {
push_token: Option<String>,
}
let profiles: Vec<ProfileRow> = response.json().await
.map_err(|e| NotificationError::SupabaseError(e.to_string()))?;
Ok(profiles.first().and_then(|p| p.push_token.clone()))
}
async fn send_via_supabase(
&self,
push_token: &str,
user_pubkey: &str,
notification: &PushNotification,
) -> Result<NotificationResult, NotificationError> {
let url = format!("{}/functions/v1/send-push", self.config.supabase_url);
let payload = SendPushPayload {
push_token: push_token.into(),
user_id: None, title: Some(notification.title.clone()),
body: Some(notification.body.clone()),
data: notification.data.clone(),
sound: notification.sound.clone(),
priority: match notification.priority {
NotificationPriority::High => "high",
NotificationPriority::Normal => "normal",
}.into(),
badge: notification.badge,
};
let response = self.http_client
.post(&url)
.header("Authorization", format!("Bearer {}", self.config.supabase_service_key))
.header("Content-Type", "application/json")
.json(&payload)
.send()
.await
.map_err(|e| NotificationError::SupabaseError(e.to_string()))?;
let status = response.status();
let body: SendPushResponse = response.json().await
.map_err(|e| NotificationError::SupabaseError(e.to_string()))?;
if body.sent {
info!(
user = %user_pubkey,
ticket_id = ?body.ticket_id,
"Push notification sent"
);
Ok(NotificationResult {
sent: true,
ticket_id: body.ticket_id,
error: None,
})
} else {
warn!(
user = %user_pubkey,
error = ?body.error,
"Push notification failed"
);
if body.error.as_deref() == Some("DeviceNotRegistered") {
self.clear_cached_token(user_pubkey).await.ok();
}
Ok(NotificationResult {
sent: false,
ticket_id: None,
error: body.error,
})
}
}
async fn clear_cached_token(&self, user_pubkey: &str) -> Result<(), NotificationError> {
use aws_sdk_dynamodb::types::AttributeValue;
self.dynamo_client
.delete_item()
.table_name(&self.config.dynamo_table)
.key("pk", AttributeValue::S(format!("USER#{}", user_pubkey)))
.key("sk", AttributeValue::S("PUSH_TOKEN".into()))
.send()
.await
.map_err(|e| NotificationError::DynamoError(e.to_string()))?;
info!(user = %user_pubkey, "Cleared invalid push token from cache");
Ok(())
}
}
#[derive(Debug, Clone, Serialize)]
pub struct PushNotification {
pub title: String,
pub body: String,
pub data: Option<NotificationData>,
pub sound: Option<String>,
pub priority: NotificationPriority,
pub badge: Option<u32>,
}
#[derive(Debug, Clone, Copy, Serialize)]
pub enum NotificationPriority {
High,
Normal,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct NotificationData {
#[serde(rename = "type")]
pub notification_type: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub cred_amount: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub merchant_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub position_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub transaction_id: Option<String>,
}
#[derive(Debug, Clone)]
pub struct NotificationResult {
pub sent: bool,
pub ticket_id: Option<String>,
pub error: Option<String>,
}
#[derive(Debug, Serialize)]
struct SendPushPayload {
push_token: String,
#[serde(skip_serializing_if = "Option::is_none")]
user_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
title: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
body: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
data: Option<NotificationData>,
#[serde(skip_serializing_if = "Option::is_none")]
sound: Option<String>,
priority: String,
#[serde(skip_serializing_if = "Option::is_none")]
badge: Option<u32>,
}
#[derive(Debug, Deserialize)]
struct SendPushResponse {
sent: bool,
ticket_id: Option<String>,
error: Option<String>,
}
#[derive(Debug, Clone)]
pub enum NotificationError {
MissingConfig(String),
InitError(String),
DynamoError(String),
SupabaseError(String),
NoToken,
}
impl std::fmt::Display for NotificationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::MissingConfig(key) => write!(f, "Missing config: {}", key),
Self::InitError(msg) => write!(f, "Init error: {}", msg),
Self::DynamoError(msg) => write!(f, "DynamoDB error: {}", msg),
Self::SupabaseError(msg) => write!(f, "Supabase error: {}", msg),
Self::NoToken => write!(f, "User has no push token"),
}
}
}
impl std::error::Error for NotificationError {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn notification_data_serializes() {
let data = NotificationData {
notification_type: "capture".into(),
cred_amount: Some(4.95),
merchant_name: Some("Miami Coffee".into()),
..Default::default()
};
let json = serde_json::to_string(&data).unwrap();
assert!(json.contains("capture"));
assert!(json.contains("4.95"));
assert!(json.contains("Miami Coffee"));
}
}