1use serde::{Deserialize, Serialize};
12use std::time::{SystemTime, UNIX_EPOCH};
13use tracing::{info, warn, error, instrument};
14
15#[derive(Debug, Clone)]
21pub struct NotificationConfig {
22 pub supabase_url: String,
24 pub supabase_service_key: String,
26 pub dynamo_table: String,
28 pub token_cache_ttl: i64,
30}
31
32impl NotificationConfig {
33 pub fn from_env() -> Result<Self, NotificationError> {
35 Ok(Self {
36 supabase_url: std::env::var("SUPABASE_URL")
37 .map_err(|_| NotificationError::MissingConfig("SUPABASE_URL".into()))?,
38 supabase_service_key: std::env::var("SUPABASE_SERVICE_ROLE_KEY")
39 .map_err(|_| NotificationError::MissingConfig("SUPABASE_SERVICE_ROLE_KEY".into()))?,
40 dynamo_table: std::env::var("DYNAMO_TABLE")
41 .unwrap_or_else(|_| "loop-agent-state".into()),
42 token_cache_ttl: 86400 * 7, })
44 }
45}
46
47pub struct NotificationService {
53 config: NotificationConfig,
54 http_client: reqwest::Client,
55 dynamo_client: aws_sdk_dynamodb::Client,
56}
57
58impl NotificationService {
59 pub async fn new(config: NotificationConfig) -> Result<Self, NotificationError> {
61 let aws_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
62 let dynamo_client = aws_sdk_dynamodb::Client::new(&aws_config);
63
64 let http_client = reqwest::Client::builder()
65 .timeout(std::time::Duration::from_secs(10))
66 .build()
67 .map_err(|e| NotificationError::InitError(e.to_string()))?;
68
69 Ok(Self {
70 config,
71 http_client,
72 dynamo_client,
73 })
74 }
75
76 #[instrument(skip(self))]
84 pub async fn send(
85 &self,
86 user_pubkey: &str,
87 notification: &PushNotification,
88 ) -> Result<NotificationResult, NotificationError> {
89 let push_token = match self.get_cached_token(user_pubkey).await? {
91 Some(token) => {
92 info!(user = %user_pubkey, "Push token found in cache");
93 token
94 }
95 None => {
96 info!(user = %user_pubkey, "Cache miss, querying Supabase");
98 let token = self.fetch_token_from_supabase(user_pubkey).await?;
99
100 if let Some(ref t) = token {
102 self.cache_token(user_pubkey, t).await?;
103 }
104
105 token.ok_or(NotificationError::NoToken)?
106 }
107 };
108
109 self.send_via_supabase(&push_token, user_pubkey, notification).await
111 }
112
113 pub async fn send_capture_notification(
115 &self,
116 user_pubkey: &str,
117 cred_amount: f64,
118 merchant_name: &str,
119 ) -> Result<NotificationResult, NotificationError> {
120 let notification = PushNotification {
121 title: "Cred Captured! 🎉".into(),
122 body: format!(
123 "You earned {:.2} Cred at {}. Tap to see it grow.",
124 cred_amount,
125 merchant_name
126 ),
127 data: Some(NotificationData {
128 notification_type: "capture".into(),
129 cred_amount: Some(cred_amount),
130 merchant_name: Some(merchant_name.into()),
131 ..Default::default()
132 }),
133 sound: Some("default".into()),
134 priority: NotificationPriority::High,
135 badge: None,
136 };
137
138 self.send(user_pubkey, ¬ification).await
139 }
140
141 pub async fn send_stake_notification(
143 &self,
144 user_pubkey: &str,
145 cred_amount: f64,
146 duration_days: u16,
147 apy: f64,
148 ) -> Result<NotificationResult, NotificationError> {
149 let notification = PushNotification {
150 title: "Auto-Staked 📈".into(),
151 body: format!(
152 "{:.2} Cred staked for {} days at {:.1}% APY",
153 cred_amount, duration_days, apy
154 ),
155 data: Some(NotificationData {
156 notification_type: "stake".into(),
157 cred_amount: Some(cred_amount),
158 ..Default::default()
159 }),
160 sound: None, priority: NotificationPriority::Normal,
162 badge: None,
163 };
164
165 self.send(user_pubkey, ¬ification).await
166 }
167
168 pub async fn send_unlock_notification(
170 &self,
171 user_pubkey: &str,
172 cred_amount: f64,
173 yield_amount: f64,
174 hours_until: u16,
175 ) -> Result<NotificationResult, NotificationError> {
176 let notification = PushNotification {
177 title: "Position Unlocking Soon ⏰".into(),
178 body: format!(
179 "Your {:.2} Cred stake unlocks in {} hours with {:.2} Cred yield!",
180 cred_amount, hours_until, yield_amount
181 ),
182 data: Some(NotificationData {
183 notification_type: "unlock".into(),
184 cred_amount: Some(cred_amount),
185 ..Default::default()
186 }),
187 sound: Some("default".into()),
188 priority: NotificationPriority::High,
189 badge: None,
190 };
191
192 self.send(user_pubkey, ¬ification).await
193 }
194
195 async fn get_cached_token(&self, user_pubkey: &str) -> Result<Option<String>, NotificationError> {
201 use aws_sdk_dynamodb::types::AttributeValue;
202
203 let result = self.dynamo_client
204 .get_item()
205 .table_name(&self.config.dynamo_table)
206 .key("pk", AttributeValue::S(format!("USER#{}", user_pubkey)))
207 .key("sk", AttributeValue::S("PUSH_TOKEN".into()))
208 .send()
209 .await
210 .map_err(|e| NotificationError::DynamoError(e.to_string()))?;
211
212 if let Some(item) = result.item {
213 if let Some(AttributeValue::N(ttl_str)) = item.get("ttl") {
215 let ttl: i64 = ttl_str.parse().unwrap_or(0);
216 let now = SystemTime::now()
217 .duration_since(UNIX_EPOCH)
218 .unwrap()
219 .as_secs() as i64;
220
221 if ttl < now {
222 return Ok(None);
224 }
225 }
226
227 if let Some(AttributeValue::S(token)) = item.get("push_token") {
228 return Ok(Some(token.clone()));
229 }
230 }
231
232 Ok(None)
233 }
234
235 async fn cache_token(&self, user_pubkey: &str, token: &str) -> Result<(), NotificationError> {
237 use aws_sdk_dynamodb::types::AttributeValue;
238
239 let now = SystemTime::now()
240 .duration_since(UNIX_EPOCH)
241 .unwrap()
242 .as_secs() as i64;
243 let ttl = now + self.config.token_cache_ttl;
244
245 self.dynamo_client
246 .put_item()
247 .table_name(&self.config.dynamo_table)
248 .item("pk", AttributeValue::S(format!("USER#{}", user_pubkey)))
249 .item("sk", AttributeValue::S("PUSH_TOKEN".into()))
250 .item("push_token", AttributeValue::S(token.into()))
251 .item("cached_at", AttributeValue::N(now.to_string()))
252 .item("ttl", AttributeValue::N(ttl.to_string()))
253 .send()
254 .await
255 .map_err(|e| NotificationError::DynamoError(e.to_string()))?;
256
257 info!(user = %user_pubkey, "Push token cached in DynamoDB");
258 Ok(())
259 }
260
261 async fn fetch_token_from_supabase(&self, user_pubkey: &str) -> Result<Option<String>, NotificationError> {
263 let url = format!(
265 "{}/rest/v1/profiles?select=push_token&wallet_address=eq.{}",
266 self.config.supabase_url,
267 user_pubkey
268 );
269
270 let response = self.http_client
271 .get(&url)
272 .header("apikey", &self.config.supabase_service_key)
273 .header("Authorization", format!("Bearer {}", self.config.supabase_service_key))
274 .send()
275 .await
276 .map_err(|e| NotificationError::SupabaseError(e.to_string()))?;
277
278 if !response.status().is_success() {
279 let status = response.status();
280 let body = response.text().await.unwrap_or_default();
281 error!(status = %status, body = %body, "Supabase query failed");
282 return Err(NotificationError::SupabaseError(format!("HTTP {}: {}", status, body)));
283 }
284
285 #[derive(Deserialize)]
286 struct ProfileRow {
287 push_token: Option<String>,
288 }
289
290 let profiles: Vec<ProfileRow> = response.json().await
291 .map_err(|e| NotificationError::SupabaseError(e.to_string()))?;
292
293 Ok(profiles.first().and_then(|p| p.push_token.clone()))
294 }
295
296 async fn send_via_supabase(
298 &self,
299 push_token: &str,
300 user_pubkey: &str,
301 notification: &PushNotification,
302 ) -> Result<NotificationResult, NotificationError> {
303 let url = format!("{}/functions/v1/send-push", self.config.supabase_url);
304
305 let payload = SendPushPayload {
306 push_token: push_token.into(),
307 user_id: None, title: Some(notification.title.clone()),
309 body: Some(notification.body.clone()),
310 data: notification.data.clone(),
311 sound: notification.sound.clone(),
312 priority: match notification.priority {
313 NotificationPriority::High => "high",
314 NotificationPriority::Normal => "normal",
315 }.into(),
316 badge: notification.badge,
317 };
318
319 let response = self.http_client
320 .post(&url)
321 .header("Authorization", format!("Bearer {}", self.config.supabase_service_key))
322 .header("Content-Type", "application/json")
323 .json(&payload)
324 .send()
325 .await
326 .map_err(|e| NotificationError::SupabaseError(e.to_string()))?;
327
328 let status = response.status();
329 let body: SendPushResponse = response.json().await
330 .map_err(|e| NotificationError::SupabaseError(e.to_string()))?;
331
332 if body.sent {
333 info!(
334 user = %user_pubkey,
335 ticket_id = ?body.ticket_id,
336 "Push notification sent"
337 );
338 Ok(NotificationResult {
339 sent: true,
340 ticket_id: body.ticket_id,
341 error: None,
342 })
343 } else {
344 warn!(
345 user = %user_pubkey,
346 error = ?body.error,
347 "Push notification failed"
348 );
349
350 if body.error.as_deref() == Some("DeviceNotRegistered") {
352 self.clear_cached_token(user_pubkey).await.ok();
353 }
354
355 Ok(NotificationResult {
356 sent: false,
357 ticket_id: None,
358 error: body.error,
359 })
360 }
361 }
362
363 async fn clear_cached_token(&self, user_pubkey: &str) -> Result<(), NotificationError> {
365 use aws_sdk_dynamodb::types::AttributeValue;
366
367 self.dynamo_client
368 .delete_item()
369 .table_name(&self.config.dynamo_table)
370 .key("pk", AttributeValue::S(format!("USER#{}", user_pubkey)))
371 .key("sk", AttributeValue::S("PUSH_TOKEN".into()))
372 .send()
373 .await
374 .map_err(|e| NotificationError::DynamoError(e.to_string()))?;
375
376 info!(user = %user_pubkey, "Cleared invalid push token from cache");
377 Ok(())
378 }
379}
380
381#[derive(Debug, Clone, Serialize)]
387pub struct PushNotification {
388 pub title: String,
389 pub body: String,
390 pub data: Option<NotificationData>,
391 pub sound: Option<String>,
392 pub priority: NotificationPriority,
393 pub badge: Option<u32>,
394}
395
396#[derive(Debug, Clone, Copy, Serialize)]
397pub enum NotificationPriority {
398 High,
399 Normal,
400}
401
402#[derive(Debug, Clone, Default, Serialize, Deserialize)]
404pub struct NotificationData {
405 #[serde(rename = "type")]
406 pub notification_type: String,
407 #[serde(skip_serializing_if = "Option::is_none")]
408 pub cred_amount: Option<f64>,
409 #[serde(skip_serializing_if = "Option::is_none")]
410 pub merchant_name: Option<String>,
411 #[serde(skip_serializing_if = "Option::is_none")]
412 pub position_id: Option<String>,
413 #[serde(skip_serializing_if = "Option::is_none")]
414 pub transaction_id: Option<String>,
415}
416
417#[derive(Debug, Clone)]
419pub struct NotificationResult {
420 pub sent: bool,
421 pub ticket_id: Option<String>,
422 pub error: Option<String>,
423}
424
425#[derive(Debug, Serialize)]
427struct SendPushPayload {
428 push_token: String,
429 #[serde(skip_serializing_if = "Option::is_none")]
430 user_id: Option<String>,
431 #[serde(skip_serializing_if = "Option::is_none")]
432 title: Option<String>,
433 #[serde(skip_serializing_if = "Option::is_none")]
434 body: Option<String>,
435 #[serde(skip_serializing_if = "Option::is_none")]
436 data: Option<NotificationData>,
437 #[serde(skip_serializing_if = "Option::is_none")]
438 sound: Option<String>,
439 priority: String,
440 #[serde(skip_serializing_if = "Option::is_none")]
441 badge: Option<u32>,
442}
443
444#[derive(Debug, Deserialize)]
446struct SendPushResponse {
447 sent: bool,
448 ticket_id: Option<String>,
449 error: Option<String>,
450}
451
452#[derive(Debug, Clone)]
457pub enum NotificationError {
458 MissingConfig(String),
460 InitError(String),
462 DynamoError(String),
464 SupabaseError(String),
466 NoToken,
468}
469
470impl std::fmt::Display for NotificationError {
471 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
472 match self {
473 Self::MissingConfig(key) => write!(f, "Missing config: {}", key),
474 Self::InitError(msg) => write!(f, "Init error: {}", msg),
475 Self::DynamoError(msg) => write!(f, "DynamoDB error: {}", msg),
476 Self::SupabaseError(msg) => write!(f, "Supabase error: {}", msg),
477 Self::NoToken => write!(f, "User has no push token"),
478 }
479 }
480}
481
482impl std::error::Error for NotificationError {}
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487
488 #[test]
489 fn notification_data_serializes() {
490 let data = NotificationData {
491 notification_type: "capture".into(),
492 cred_amount: Some(4.95),
493 merchant_name: Some("Miami Coffee".into()),
494 ..Default::default()
495 };
496
497 let json = serde_json::to_string(&data).unwrap();
498 assert!(json.contains("capture"));
499 assert!(json.contains("4.95"));
500 assert!(json.contains("Miami Coffee"));
501 }
502}