Skip to main content

loop_agent_sdk/
notifications.rs

1//! Loop Agent SDK - Push Notifications
2//! 
3//! Lightweight notification layer that:
4//! 1. Checks DynamoDB for cached push_token (<5ms)
5//! 2. Falls back to Supabase if not cached
6//! 3. Caches token in DynamoDB for future use
7//! 4. Calls Supabase send-push Edge Function
8//! 
9//! This keeps 99% of notifications under 100ms.
10
11use serde::{Deserialize, Serialize};
12use std::time::{SystemTime, UNIX_EPOCH};
13use tracing::{info, warn, error, instrument};
14
15// ============================================================================
16// CONFIGURATION
17// ============================================================================
18
19/// Notification service configuration
20#[derive(Debug, Clone)]
21pub struct NotificationConfig {
22    /// Supabase project URL
23    pub supabase_url: String,
24    /// Supabase service role key (for calling edge functions)
25    pub supabase_service_key: String,
26    /// DynamoDB table name
27    pub dynamo_table: String,
28    /// Cache TTL for push tokens (seconds)
29    pub token_cache_ttl: i64,
30}
31
32impl NotificationConfig {
33    /// Load from environment variables
34    pub fn from_env() -> Result<Self, NotificationError> {
35        Ok(Self {
36            supabase_url: std::env::var("SUPABASE_URL")
37                .map_err(|_| NotificationError::MissingConfig("SUPABASE_URL".into()))?,
38            supabase_service_key: std::env::var("SUPABASE_SERVICE_ROLE_KEY")
39                .map_err(|_| NotificationError::MissingConfig("SUPABASE_SERVICE_ROLE_KEY".into()))?,
40            dynamo_table: std::env::var("DYNAMO_TABLE")
41                .unwrap_or_else(|_| "loop-agent-state".into()),
42            token_cache_ttl: 86400 * 7, // 7 days
43        })
44    }
45}
46
47// ============================================================================
48// NOTIFICATION SERVICE
49// ============================================================================
50
51/// Push notification service with DynamoDB caching
52pub struct NotificationService {
53    config: NotificationConfig,
54    http_client: reqwest::Client,
55    dynamo_client: aws_sdk_dynamodb::Client,
56}
57
58impl NotificationService {
59    /// Create new notification service
60    pub async fn new(config: NotificationConfig) -> Result<Self, NotificationError> {
61        let aws_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
62        let dynamo_client = aws_sdk_dynamodb::Client::new(&aws_config);
63        
64        let http_client = reqwest::Client::builder()
65            .timeout(std::time::Duration::from_secs(10))
66            .build()
67            .map_err(|e| NotificationError::InitError(e.to_string()))?;
68        
69        Ok(Self {
70            config,
71            http_client,
72            dynamo_client,
73        })
74    }
75    
76    /// Send push notification to user
77    /// 
78    /// Flow:
79    /// 1. Check DynamoDB cache for push_token
80    /// 2. If miss, query Supabase profiles table
81    /// 3. Cache token in DynamoDB
82    /// 4. Call Supabase send-push edge function
83    #[instrument(skip(self))]
84    pub async fn send(
85        &self,
86        user_pubkey: &str,
87        notification: &PushNotification,
88    ) -> Result<NotificationResult, NotificationError> {
89        // Step 1: Try to get push token from DynamoDB cache
90        let push_token = match self.get_cached_token(user_pubkey).await? {
91            Some(token) => {
92                info!(user = %user_pubkey, "Push token found in cache");
93                token
94            }
95            None => {
96                // Step 2: Query Supabase for token
97                info!(user = %user_pubkey, "Cache miss, querying Supabase");
98                let token = self.fetch_token_from_supabase(user_pubkey).await?;
99                
100                // Step 3: Cache it for next time
101                if let Some(ref t) = token {
102                    self.cache_token(user_pubkey, t).await?;
103                }
104                
105                token.ok_or(NotificationError::NoToken)?
106            }
107        };
108        
109        // Step 4: Send via Supabase edge function
110        self.send_via_supabase(&push_token, user_pubkey, notification).await
111    }
112    
113    /// Send capture notification (convenience method)
114    pub async fn send_capture_notification(
115        &self,
116        user_pubkey: &str,
117        cred_amount: f64,
118        merchant_name: &str,
119    ) -> Result<NotificationResult, NotificationError> {
120        let notification = PushNotification {
121            title: "Cred Captured! 🎉".into(),
122            body: format!(
123                "You earned {:.2} Cred at {}. Tap to see it grow.",
124                cred_amount,
125                merchant_name
126            ),
127            data: Some(NotificationData {
128                notification_type: "capture".into(),
129                cred_amount: Some(cred_amount),
130                merchant_name: Some(merchant_name.into()),
131                ..Default::default()
132            }),
133            sound: Some("default".into()),
134            priority: NotificationPriority::High,
135            badge: None,
136        };
137        
138        self.send(user_pubkey, &notification).await
139    }
140    
141    /// Send auto-stake notification
142    pub async fn send_stake_notification(
143        &self,
144        user_pubkey: &str,
145        cred_amount: f64,
146        duration_days: u16,
147        apy: f64,
148    ) -> Result<NotificationResult, NotificationError> {
149        let notification = PushNotification {
150            title: "Auto-Staked 📈".into(),
151            body: format!(
152                "{:.2} Cred staked for {} days at {:.1}% APY",
153                cred_amount, duration_days, apy
154            ),
155            data: Some(NotificationData {
156                notification_type: "stake".into(),
157                cred_amount: Some(cred_amount),
158                ..Default::default()
159            }),
160            sound: None, // Silent for auto-actions
161            priority: NotificationPriority::Normal,
162            badge: None,
163        };
164        
165        self.send(user_pubkey, &notification).await
166    }
167    
168    /// Send unlock reminder
169    pub async fn send_unlock_notification(
170        &self,
171        user_pubkey: &str,
172        cred_amount: f64,
173        yield_amount: f64,
174        hours_until: u16,
175    ) -> Result<NotificationResult, NotificationError> {
176        let notification = PushNotification {
177            title: "Position Unlocking Soon ⏰".into(),
178            body: format!(
179                "Your {:.2} Cred stake unlocks in {} hours with {:.2} Cred yield!",
180                cred_amount, hours_until, yield_amount
181            ),
182            data: Some(NotificationData {
183                notification_type: "unlock".into(),
184                cred_amount: Some(cred_amount),
185                ..Default::default()
186            }),
187            sound: Some("default".into()),
188            priority: NotificationPriority::High,
189            badge: None,
190        };
191        
192        self.send(user_pubkey, &notification).await
193    }
194    
195    // ========================================================================
196    // INTERNAL METHODS
197    // ========================================================================
198    
199    /// Get cached push token from DynamoDB
200    async fn get_cached_token(&self, user_pubkey: &str) -> Result<Option<String>, NotificationError> {
201        use aws_sdk_dynamodb::types::AttributeValue;
202        
203        let result = self.dynamo_client
204            .get_item()
205            .table_name(&self.config.dynamo_table)
206            .key("pk", AttributeValue::S(format!("USER#{}", user_pubkey)))
207            .key("sk", AttributeValue::S("PUSH_TOKEN".into()))
208            .send()
209            .await
210            .map_err(|e| NotificationError::DynamoError(e.to_string()))?;
211        
212        if let Some(item) = result.item {
213            // Check TTL
214            if let Some(AttributeValue::N(ttl_str)) = item.get("ttl") {
215                let ttl: i64 = ttl_str.parse().unwrap_or(0);
216                let now = SystemTime::now()
217                    .duration_since(UNIX_EPOCH)
218                    .unwrap()
219                    .as_secs() as i64;
220                
221                if ttl < now {
222                    // Token expired
223                    return Ok(None);
224                }
225            }
226            
227            if let Some(AttributeValue::S(token)) = item.get("push_token") {
228                return Ok(Some(token.clone()));
229            }
230        }
231        
232        Ok(None)
233    }
234    
235    /// Cache push token in DynamoDB
236    async fn cache_token(&self, user_pubkey: &str, token: &str) -> Result<(), NotificationError> {
237        use aws_sdk_dynamodb::types::AttributeValue;
238        
239        let now = SystemTime::now()
240            .duration_since(UNIX_EPOCH)
241            .unwrap()
242            .as_secs() as i64;
243        let ttl = now + self.config.token_cache_ttl;
244        
245        self.dynamo_client
246            .put_item()
247            .table_name(&self.config.dynamo_table)
248            .item("pk", AttributeValue::S(format!("USER#{}", user_pubkey)))
249            .item("sk", AttributeValue::S("PUSH_TOKEN".into()))
250            .item("push_token", AttributeValue::S(token.into()))
251            .item("cached_at", AttributeValue::N(now.to_string()))
252            .item("ttl", AttributeValue::N(ttl.to_string()))
253            .send()
254            .await
255            .map_err(|e| NotificationError::DynamoError(e.to_string()))?;
256        
257        info!(user = %user_pubkey, "Push token cached in DynamoDB");
258        Ok(())
259    }
260    
261    /// Fetch push token from Supabase profiles table
262    async fn fetch_token_from_supabase(&self, user_pubkey: &str) -> Result<Option<String>, NotificationError> {
263        // Query profiles table by wallet_address
264        let url = format!(
265            "{}/rest/v1/profiles?select=push_token&wallet_address=eq.{}",
266            self.config.supabase_url,
267            user_pubkey
268        );
269        
270        let response = self.http_client
271            .get(&url)
272            .header("apikey", &self.config.supabase_service_key)
273            .header("Authorization", format!("Bearer {}", self.config.supabase_service_key))
274            .send()
275            .await
276            .map_err(|e| NotificationError::SupabaseError(e.to_string()))?;
277        
278        if !response.status().is_success() {
279            let status = response.status();
280            let body = response.text().await.unwrap_or_default();
281            error!(status = %status, body = %body, "Supabase query failed");
282            return Err(NotificationError::SupabaseError(format!("HTTP {}: {}", status, body)));
283        }
284        
285        #[derive(Deserialize)]
286        struct ProfileRow {
287            push_token: Option<String>,
288        }
289        
290        let profiles: Vec<ProfileRow> = response.json().await
291            .map_err(|e| NotificationError::SupabaseError(e.to_string()))?;
292        
293        Ok(profiles.first().and_then(|p| p.push_token.clone()))
294    }
295    
296    /// Send notification via Supabase send-push edge function
297    async fn send_via_supabase(
298        &self,
299        push_token: &str,
300        user_pubkey: &str,
301        notification: &PushNotification,
302    ) -> Result<NotificationResult, NotificationError> {
303        let url = format!("{}/functions/v1/send-push", self.config.supabase_url);
304        
305        let payload = SendPushPayload {
306            push_token: push_token.into(),
307            user_id: None, // We use pubkey, not Supabase user_id
308            title: Some(notification.title.clone()),
309            body: Some(notification.body.clone()),
310            data: notification.data.clone(),
311            sound: notification.sound.clone(),
312            priority: match notification.priority {
313                NotificationPriority::High => "high",
314                NotificationPriority::Normal => "normal",
315            }.into(),
316            badge: notification.badge,
317        };
318        
319        let response = self.http_client
320            .post(&url)
321            .header("Authorization", format!("Bearer {}", self.config.supabase_service_key))
322            .header("Content-Type", "application/json")
323            .json(&payload)
324            .send()
325            .await
326            .map_err(|e| NotificationError::SupabaseError(e.to_string()))?;
327        
328        let status = response.status();
329        let body: SendPushResponse = response.json().await
330            .map_err(|e| NotificationError::SupabaseError(e.to_string()))?;
331        
332        if body.sent {
333            info!(
334                user = %user_pubkey,
335                ticket_id = ?body.ticket_id,
336                "Push notification sent"
337            );
338            Ok(NotificationResult {
339                sent: true,
340                ticket_id: body.ticket_id,
341                error: None,
342            })
343        } else {
344            warn!(
345                user = %user_pubkey,
346                error = ?body.error,
347                "Push notification failed"
348            );
349            
350            // If token is invalid, clear the cache
351            if body.error.as_deref() == Some("DeviceNotRegistered") {
352                self.clear_cached_token(user_pubkey).await.ok();
353            }
354            
355            Ok(NotificationResult {
356                sent: false,
357                ticket_id: None,
358                error: body.error,
359            })
360        }
361    }
362    
363    /// Clear cached push token (e.g., if it's invalid)
364    async fn clear_cached_token(&self, user_pubkey: &str) -> Result<(), NotificationError> {
365        use aws_sdk_dynamodb::types::AttributeValue;
366        
367        self.dynamo_client
368            .delete_item()
369            .table_name(&self.config.dynamo_table)
370            .key("pk", AttributeValue::S(format!("USER#{}", user_pubkey)))
371            .key("sk", AttributeValue::S("PUSH_TOKEN".into()))
372            .send()
373            .await
374            .map_err(|e| NotificationError::DynamoError(e.to_string()))?;
375        
376        info!(user = %user_pubkey, "Cleared invalid push token from cache");
377        Ok(())
378    }
379}
380
381// ============================================================================
382// DATA TYPES
383// ============================================================================
384
385/// Push notification to send
386#[derive(Debug, Clone, Serialize)]
387pub struct PushNotification {
388    pub title: String,
389    pub body: String,
390    pub data: Option<NotificationData>,
391    pub sound: Option<String>,
392    pub priority: NotificationPriority,
393    pub badge: Option<u32>,
394}
395
396#[derive(Debug, Clone, Copy, Serialize)]
397pub enum NotificationPriority {
398    High,
399    Normal,
400}
401
402/// Additional data payload
403#[derive(Debug, Clone, Default, Serialize, Deserialize)]
404pub struct NotificationData {
405    #[serde(rename = "type")]
406    pub notification_type: String,
407    #[serde(skip_serializing_if = "Option::is_none")]
408    pub cred_amount: Option<f64>,
409    #[serde(skip_serializing_if = "Option::is_none")]
410    pub merchant_name: Option<String>,
411    #[serde(skip_serializing_if = "Option::is_none")]
412    pub position_id: Option<String>,
413    #[serde(skip_serializing_if = "Option::is_none")]
414    pub transaction_id: Option<String>,
415}
416
417/// Result of sending a notification
418#[derive(Debug, Clone)]
419pub struct NotificationResult {
420    pub sent: bool,
421    pub ticket_id: Option<String>,
422    pub error: Option<String>,
423}
424
425/// Payload for Supabase send-push function
426#[derive(Debug, Serialize)]
427struct SendPushPayload {
428    push_token: String,
429    #[serde(skip_serializing_if = "Option::is_none")]
430    user_id: Option<String>,
431    #[serde(skip_serializing_if = "Option::is_none")]
432    title: Option<String>,
433    #[serde(skip_serializing_if = "Option::is_none")]
434    body: Option<String>,
435    #[serde(skip_serializing_if = "Option::is_none")]
436    data: Option<NotificationData>,
437    #[serde(skip_serializing_if = "Option::is_none")]
438    sound: Option<String>,
439    priority: String,
440    #[serde(skip_serializing_if = "Option::is_none")]
441    badge: Option<u32>,
442}
443
444/// Response from Supabase send-push function
445#[derive(Debug, Deserialize)]
446struct SendPushResponse {
447    sent: bool,
448    ticket_id: Option<String>,
449    error: Option<String>,
450}
451
452// ============================================================================
453// ERRORS
454// ============================================================================
455
456#[derive(Debug, Clone)]
457pub enum NotificationError {
458    /// Missing required configuration
459    MissingConfig(String),
460    /// Initialization failed
461    InitError(String),
462    /// DynamoDB error
463    DynamoError(String),
464    /// Supabase error
465    SupabaseError(String),
466    /// User has no push token
467    NoToken,
468}
469
470impl std::fmt::Display for NotificationError {
471    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
472        match self {
473            Self::MissingConfig(key) => write!(f, "Missing config: {}", key),
474            Self::InitError(msg) => write!(f, "Init error: {}", msg),
475            Self::DynamoError(msg) => write!(f, "DynamoDB error: {}", msg),
476            Self::SupabaseError(msg) => write!(f, "Supabase error: {}", msg),
477            Self::NoToken => write!(f, "User has no push token"),
478        }
479    }
480}
481
482impl std::error::Error for NotificationError {}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487    
488    #[test]
489    fn notification_data_serializes() {
490        let data = NotificationData {
491            notification_type: "capture".into(),
492            cred_amount: Some(4.95),
493            merchant_name: Some("Miami Coffee".into()),
494            ..Default::default()
495        };
496        
497        let json = serde_json::to_string(&data).unwrap();
498        assert!(json.contains("capture"));
499        assert!(json.contains("4.95"));
500        assert!(json.contains("Miami Coffee"));
501    }
502}