Skip to main content

this/events/sinks/
push.rs

1//! Push notification sink — delivers via Expo, APNs, or FCM
2//!
3//! Uses the `PushProvider` trait to abstract the push notification backend.
4//! The default implementation is `ExpoPushProvider` which sends via the
5//! Expo Push API (<https://exp.host/--/api/v2/push/send>).
6//!
7//! # Retry strategy
8//!
9//! Failed sends are retried up to 3 times with exponential backoff:
10//! - Attempt 1: immediate
11//! - Attempt 2: after 100ms
12//! - Attempt 3: after 500ms
13//! - Attempt 4: after 2s
14//!
15//! Only server errors (5xx) and network errors are retried.
16//! Client errors (4xx) fail immediately.
17
18use crate::config::sinks::SinkType;
19use crate::events::sinks::Sink;
20use crate::events::sinks::device_tokens::DeviceTokenStore;
21use crate::events::sinks::preferences::NotificationPreferencesStore;
22use anyhow::{Result, anyhow};
23use async_trait::async_trait;
24use serde::{Deserialize, Serialize};
25use serde_json::Value;
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::time::Duration;
29
30#[cfg(feature = "push")]
31use reqwest;
32
33/// Push message to send to a provider
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct PushMessage {
36    /// Device push token
37    pub to: String,
38
39    /// Notification title
40    pub title: String,
41
42    /// Notification body
43    pub body: String,
44
45    /// Extra data payload (passed to the app when notification is tapped)
46    #[serde(default, skip_serializing_if = "Value::is_null")]
47    pub data: Value,
48
49    /// Sound to play (default: "default")
50    #[serde(default = "default_sound")]
51    pub sound: String,
52}
53
54fn default_sound() -> String {
55    "default".to_string()
56}
57
58/// Result of a push send attempt
59#[derive(Debug, Clone)]
60pub enum PushResult {
61    /// Successfully sent
62    Success,
63    /// Failed with retriable error (server error, network issue)
64    RetriableError(String),
65    /// Failed with non-retriable error (invalid token, etc.)
66    PermanentError(String),
67}
68
69/// Trait for push notification providers
70///
71/// Abstracts the backend used to send push notifications.
72/// Implementations: `ExpoPushProvider` (default), future: `ApnsPushProvider`, `FcmPushProvider`
73#[async_trait]
74pub trait PushProvider: Send + Sync + std::fmt::Debug {
75    /// Send a batch of push messages
76    ///
77    /// Returns one `PushResult` per message, in the same order.
78    async fn send_batch(&self, messages: Vec<PushMessage>) -> Vec<PushResult>;
79
80    /// Provider name for logging
81    fn name(&self) -> &str;
82}
83
84/// Expo Push API provider
85///
86/// Sends push notifications via the Expo Push API.
87/// Works with Expo push tokens (format: "ExponentPushToken\[xxx\]").
88///
89/// Requires the `push` feature to be enabled.
90#[cfg(feature = "push")]
91#[derive(Debug)]
92pub struct ExpoPushProvider {
93    client: reqwest::Client,
94    api_url: String,
95}
96
97#[cfg(feature = "push")]
98impl ExpoPushProvider {
99    /// Create with default Expo API URL
100    pub fn new() -> Self {
101        Self {
102            client: reqwest::Client::new(),
103            api_url: "https://exp.host/--/api/v2/push/send".to_string(),
104        }
105    }
106
107    /// Create with a custom API URL (for testing)
108    pub fn with_url(url: String) -> Self {
109        Self {
110            client: reqwest::Client::new(),
111            api_url: url,
112        }
113    }
114}
115
116#[cfg(feature = "push")]
117impl Default for ExpoPushProvider {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123#[cfg(feature = "push")]
124#[async_trait]
125impl PushProvider for ExpoPushProvider {
126    async fn send_batch(&self, messages: Vec<PushMessage>) -> Vec<PushResult> {
127        if messages.is_empty() {
128            return Vec::new();
129        }
130
131        // Expo API accepts an array of messages
132        let response = self.client.post(&self.api_url).json(&messages).send().await;
133
134        match response {
135            Ok(resp) => {
136                let status = resp.status();
137                if status.is_success() {
138                    // Parse Expo's response to check per-ticket status
139                    match resp.json::<ExpoResponse>().await {
140                        Ok(expo_resp) => expo_resp
141                            .data
142                            .into_iter()
143                            .map(|ticket| match ticket.status.as_str() {
144                                "ok" => PushResult::Success,
145                                "error" => {
146                                    let msg = ticket
147                                        .message
148                                        .unwrap_or_else(|| "unknown error".to_string());
149                                    // DeviceNotRegistered → permanent error
150                                    if ticket.details.as_ref().is_some_and(|d| {
151                                        d.get("error")
152                                            .and_then(|e| e.as_str())
153                                            .is_some_and(|e| e == "DeviceNotRegistered")
154                                    }) {
155                                        PushResult::PermanentError(msg)
156                                    } else {
157                                        PushResult::RetriableError(msg)
158                                    }
159                                }
160                                _ => PushResult::RetriableError(format!(
161                                    "unexpected status: {}",
162                                    ticket.status
163                                )),
164                            })
165                            .collect(),
166                        Err(e) => {
167                            // Couldn't parse response — treat all as retriable
168                            vec![
169                                PushResult::RetriableError(format!(
170                                    "failed to parse Expo response: {}",
171                                    e
172                                ));
173                                messages.len()
174                            ]
175                        }
176                    }
177                } else if status.is_server_error() {
178                    vec![
179                        PushResult::RetriableError(format!("server error: {}", status));
180                        messages.len()
181                    ]
182                } else {
183                    // 4xx → permanent error
184                    let body = resp.text().await.unwrap_or_default();
185                    vec![
186                        PushResult::PermanentError(format!("client error {}: {}", status, body));
187                        messages.len()
188                    ]
189                }
190            }
191            Err(e) => {
192                // Network error → retriable
193                vec![PushResult::RetriableError(format!("network error: {}", e)); messages.len()]
194            }
195        }
196    }
197
198    fn name(&self) -> &str {
199        "expo"
200    }
201}
202
203/// Expo Push API response format
204#[cfg(feature = "push")]
205#[derive(Debug, Deserialize)]
206struct ExpoResponse {
207    data: Vec<ExpoTicket>,
208}
209
210/// Individual push ticket from Expo
211#[cfg(feature = "push")]
212#[derive(Debug, Deserialize)]
213struct ExpoTicket {
214    status: String,
215    #[serde(default)]
216    message: Option<String>,
217    #[serde(default)]
218    details: Option<Value>,
219}
220
221/// Retry configuration for push delivery
222#[derive(Debug, Clone)]
223pub struct RetryConfig {
224    /// Maximum number of retry attempts (excluding the first attempt)
225    pub max_retries: u32,
226    /// Backoff durations for each retry attempt
227    pub backoff: Vec<Duration>,
228}
229
230impl Default for RetryConfig {
231    fn default() -> Self {
232        Self {
233            max_retries: 3,
234            backoff: vec![
235                Duration::from_millis(100),
236                Duration::from_millis(500),
237                Duration::from_secs(2),
238            ],
239        }
240    }
241}
242
243/// Push notification sink
244///
245/// Receives payloads from the `deliver` operator and sends push
246/// notifications to all registered device tokens for the recipient.
247///
248/// # Preferences
249///
250/// If a `NotificationPreferencesStore` is attached via `with_preferences`,
251/// the sink checks user preferences before sending. Disabled notification
252/// types are silently dropped (same pattern as `InAppNotificationSink`).
253///
254/// # Stale token cleanup
255///
256/// When a push provider returns `PermanentError` (e.g., `DeviceNotRegistered`),
257/// the corresponding device token is automatically unregistered from the store.
258#[derive(Debug)]
259pub struct PushNotificationSink {
260    /// Device token store
261    device_tokens: Arc<DeviceTokenStore>,
262
263    /// Push provider (Expo by default)
264    provider: Arc<dyn PushProvider>,
265
266    /// Retry configuration
267    retry_config: RetryConfig,
268
269    /// Optional preferences store (checks before delivering)
270    preferences: Option<Arc<NotificationPreferencesStore>>,
271}
272
273impl PushNotificationSink {
274    /// Create with default Expo provider and retry config
275    ///
276    /// Requires the `push` feature to be enabled.
277    #[cfg(feature = "push")]
278    pub fn new(device_tokens: Arc<DeviceTokenStore>) -> Self {
279        Self {
280            device_tokens,
281            provider: Arc::new(ExpoPushProvider::new()),
282            retry_config: RetryConfig::default(),
283            preferences: None,
284        }
285    }
286
287    /// Create with a custom push provider
288    pub fn with_provider(
289        device_tokens: Arc<DeviceTokenStore>,
290        provider: Arc<dyn PushProvider>,
291    ) -> Self {
292        Self {
293            device_tokens,
294            provider,
295            retry_config: RetryConfig::default(),
296            preferences: None,
297        }
298    }
299
300    /// Create with custom provider and retry config
301    pub fn with_config(
302        device_tokens: Arc<DeviceTokenStore>,
303        provider: Arc<dyn PushProvider>,
304        retry_config: RetryConfig,
305    ) -> Self {
306        Self {
307            device_tokens,
308            provider,
309            retry_config,
310            preferences: None,
311        }
312    }
313
314    /// Attach a preferences store to check before sending
315    ///
316    /// When set, the sink checks `is_enabled(recipient, notification_type)`
317    /// before sending. Disabled types are silently dropped.
318    pub fn with_preferences(mut self, preferences: Arc<NotificationPreferencesStore>) -> Self {
319        self.preferences = Some(preferences);
320        self
321    }
322
323    /// Send messages with retry logic
324    ///
325    /// Returns the list of tokens that had permanent errors (e.g., `DeviceNotRegistered`).
326    /// The caller should unregister these tokens from the store.
327    async fn send_with_retry(&self, messages: Vec<PushMessage>) -> Result<Vec<String>> {
328        let mut pending = messages;
329        let mut attempt = 0;
330        let mut permanently_failed_tokens: Vec<String> = Vec::new();
331
332        loop {
333            let results = self.provider.send_batch(pending.clone()).await;
334
335            let mut failed: Vec<PushMessage> = Vec::new();
336            let mut permanent_errors: Vec<String> = Vec::new();
337
338            for (msg, result) in pending.iter().zip(results.iter()) {
339                match result {
340                    PushResult::Success => {}
341                    PushResult::RetriableError(err) => {
342                        tracing::warn!(
343                            token = %msg.to,
344                            error = %err,
345                            attempt = attempt + 1,
346                            "push: retriable error"
347                        );
348                        failed.push(msg.clone());
349                    }
350                    PushResult::PermanentError(err) => {
351                        tracing::error!(
352                            token = %msg.to,
353                            error = %err,
354                            "push: permanent error (will not retry)"
355                        );
356                        permanently_failed_tokens.push(msg.to.clone());
357                        permanent_errors.push(err.clone());
358                    }
359                }
360            }
361
362            if failed.is_empty() {
363                if permanent_errors.is_empty() {
364                    return Ok(permanently_failed_tokens);
365                } else {
366                    // All retriable sent, but some had permanent errors
367                    // Still return the failed tokens for cleanup
368                    return Ok(permanently_failed_tokens);
369                }
370            }
371
372            attempt += 1;
373            if attempt > self.retry_config.max_retries {
374                return Err(anyhow!(
375                    "push: {} message(s) failed after {} retries",
376                    failed.len(),
377                    self.retry_config.max_retries
378                ));
379            }
380
381            // Backoff before retry
382            let backoff_idx = (attempt as usize - 1).min(self.retry_config.backoff.len() - 1);
383            let delay = self.retry_config.backoff[backoff_idx];
384            tracing::debug!(
385                attempt = attempt,
386                delay_ms = delay.as_millis(),
387                remaining = failed.len(),
388                "push: retrying after backoff"
389            );
390            tokio::time::sleep(delay).await;
391
392            pending = failed;
393        }
394    }
395}
396
397#[async_trait]
398impl Sink for PushNotificationSink {
399    async fn deliver(
400        &self,
401        payload: Value,
402        recipient_id: Option<&str>,
403        context_vars: &HashMap<String, Value>,
404    ) -> Result<()> {
405        // Determine recipient
406        let recipient = super::resolve_recipient(recipient_id, &payload, context_vars)
407            .ok_or_else(|| anyhow!("push sink: recipient_id not found"))?;
408
409        // Check preferences before sending (same pattern as InAppNotificationSink)
410        if let Some(prefs_store) = &self.preferences {
411            let notification_type = payload
412                .get("notification_type")
413                .and_then(|v| v.as_str())
414                .unwrap_or("generic");
415
416            if !prefs_store.is_enabled(&recipient, notification_type).await {
417                tracing::debug!(
418                    recipient = %recipient,
419                    notification_type = %notification_type,
420                    "push sink: notification type disabled by user preferences, skipping"
421                );
422                return Ok(());
423            }
424        }
425
426        // Get device tokens
427        let tokens = self.device_tokens.get_tokens(&recipient).await;
428        if tokens.is_empty() {
429            tracing::debug!(
430                recipient = %recipient,
431                "push sink: no device tokens registered, skipping"
432            );
433            return Ok(());
434        }
435
436        // Extract notification fields
437        let title = payload
438            .get("title")
439            .and_then(|v| v.as_str())
440            .unwrap_or("Notification")
441            .to_string();
442
443        let body = payload
444            .get("body")
445            .and_then(|v| v.as_str())
446            .unwrap_or("")
447            .to_string();
448
449        let data = payload.get("data").cloned().unwrap_or(Value::Null);
450
451        // Build messages — one per device token
452        let messages: Vec<PushMessage> = tokens
453            .into_iter()
454            .map(|dt| PushMessage {
455                to: dt.token,
456                title: title.clone(),
457                body: body.clone(),
458                data: data.clone(),
459                sound: "default".to_string(),
460            })
461            .collect();
462
463        tracing::debug!(
464            recipient = %recipient,
465            token_count = messages.len(),
466            provider = self.provider.name(),
467            "push sink: sending notifications"
468        );
469
470        // Send with retry; collect permanently failed tokens for cleanup
471        let stale_tokens = self.send_with_retry(messages).await?;
472
473        // Unregister stale tokens (e.g., DeviceNotRegistered)
474        for token in &stale_tokens {
475            tracing::info!(
476                recipient = %recipient,
477                token = %token,
478                "push sink: unregistering stale device token"
479            );
480            self.device_tokens.unregister(&recipient, token).await;
481        }
482
483        Ok(())
484    }
485
486    fn name(&self) -> &str {
487        "push"
488    }
489
490    fn sink_type(&self) -> SinkType {
491        SinkType::Push
492    }
493}
494
495#[cfg(test)]
496mod tests {
497    use super::*;
498    use crate::events::sinks::device_tokens::Platform;
499    use serde_json::json;
500    use std::sync::atomic::{AtomicUsize, Ordering};
501
502    // ── Mock push provider ──────────────────────────────────────────
503
504    /// Shared state for the mock push provider
505    #[derive(Debug, Clone)]
506    struct MockState {
507        results: Arc<tokio::sync::Mutex<Vec<Vec<PushResult>>>>,
508        call_count: Arc<AtomicUsize>,
509        received: Arc<tokio::sync::Mutex<Vec<Vec<PushMessage>>>>,
510    }
511
512    /// A mock push provider that records calls and returns configurable results
513    #[derive(Debug)]
514    struct MockPushProvider {
515        state: MockState,
516    }
517
518    impl MockPushProvider {
519        fn new(results: Vec<Vec<PushResult>>) -> (Self, MockState) {
520            let state = MockState {
521                results: Arc::new(tokio::sync::Mutex::new(results)),
522                call_count: Arc::new(AtomicUsize::new(0)),
523                received: Arc::new(tokio::sync::Mutex::new(Vec::new())),
524            };
525            (
526                Self {
527                    state: state.clone(),
528                },
529                state,
530            )
531        }
532
533        /// Provider that always succeeds
534        fn always_success() -> (Self, MockState) {
535            Self::new(Vec::new())
536        }
537    }
538
539    #[async_trait]
540    impl PushProvider for MockPushProvider {
541        async fn send_batch(&self, messages: Vec<PushMessage>) -> Vec<PushResult> {
542            let call_idx = self.state.call_count.fetch_add(1, Ordering::SeqCst);
543            self.state.received.lock().await.push(messages.clone());
544
545            let mut results = self.state.results.lock().await;
546            if call_idx < results.len() {
547                results[call_idx].drain(..).collect()
548            } else {
549                // Default: all success
550                vec![PushResult::Success; messages.len()]
551            }
552        }
553
554        fn name(&self) -> &str {
555            "mock"
556        }
557    }
558
559    fn fast_retry_config() -> RetryConfig {
560        RetryConfig {
561            max_retries: 3,
562            backoff: vec![
563                Duration::from_millis(1),
564                Duration::from_millis(1),
565                Duration::from_millis(1),
566            ],
567        }
568    }
569
570    // ── Tests ────────────────────────────────────────────────────────
571
572    #[tokio::test]
573    async fn test_push_deliver_success() {
574        let tokens = Arc::new(DeviceTokenStore::new());
575        tokens
576            .register(
577                "user-A",
578                "ExponentPushToken[abc]".to_string(),
579                Platform::Ios,
580            )
581            .await;
582
583        let (provider, state) = MockPushProvider::always_success();
584        let sink = PushNotificationSink::with_provider(tokens, Arc::new(provider));
585
586        let payload = json!({
587            "title": "New follower",
588            "body": "Alice followed you",
589            "recipient_id": "user-A",
590            "data": {"screen": "profile"}
591        });
592
593        sink.deliver(payload, None, &HashMap::new()).await.unwrap();
594
595        let calls = state.received.lock().await;
596        assert_eq!(calls.len(), 1);
597        assert_eq!(calls[0].len(), 1);
598        assert_eq!(calls[0][0].to, "ExponentPushToken[abc]");
599        assert_eq!(calls[0][0].title, "New follower");
600        assert_eq!(calls[0][0].body, "Alice followed you");
601        assert_eq!(calls[0][0].data, json!({"screen": "profile"}));
602    }
603
604    #[tokio::test]
605    async fn test_push_deliver_multiple_tokens() {
606        let tokens = Arc::new(DeviceTokenStore::new());
607        tokens
608            .register("user-A", "token-1".to_string(), Platform::Ios)
609            .await;
610        tokens
611            .register("user-A", "token-2".to_string(), Platform::Android)
612            .await;
613
614        let (provider, state) = MockPushProvider::always_success();
615        let sink = PushNotificationSink::with_provider(tokens, Arc::new(provider));
616
617        let payload = json!({
618            "title": "Test",
619            "body": "Hello",
620            "recipient_id": "user-A"
621        });
622
623        sink.deliver(payload, None, &HashMap::new()).await.unwrap();
624
625        let calls = state.received.lock().await;
626        assert_eq!(calls[0].len(), 2);
627        assert_eq!(calls[0][0].to, "token-1");
628        assert_eq!(calls[0][1].to, "token-2");
629    }
630
631    #[tokio::test]
632    async fn test_push_deliver_no_tokens_skips() {
633        let tokens = Arc::new(DeviceTokenStore::new());
634        let (provider, state) = MockPushProvider::always_success();
635        let sink = PushNotificationSink::with_provider(tokens, Arc::new(provider));
636
637        let payload = json!({
638            "title": "Test",
639            "recipient_id": "user-A"
640        });
641
642        // Should succeed silently (no tokens registered)
643        sink.deliver(payload, None, &HashMap::new()).await.unwrap();
644
645        // Provider should not have been called
646        assert_eq!(state.call_count.load(Ordering::SeqCst), 0);
647    }
648
649    #[tokio::test]
650    async fn test_push_deliver_no_recipient_error() {
651        let tokens = Arc::new(DeviceTokenStore::new());
652        let (provider, _state) = MockPushProvider::always_success();
653        let sink = PushNotificationSink::with_provider(tokens, Arc::new(provider));
654
655        let payload = json!({"title": "Test"});
656        let result = sink.deliver(payload, None, &HashMap::new()).await;
657        assert!(result.is_err());
658        assert!(result.unwrap_err().to_string().contains("recipient_id"));
659    }
660
661    #[tokio::test]
662    async fn test_push_retry_on_server_error() {
663        let tokens = Arc::new(DeviceTokenStore::new());
664        tokens
665            .register("user-A", "token-1".to_string(), Platform::Ios)
666            .await;
667
668        // First call: retriable error, second call: success
669        let (provider, state) = MockPushProvider::new(vec![
670            vec![PushResult::RetriableError("server error: 500".to_string())],
671            vec![PushResult::Success],
672        ]);
673
674        let sink =
675            PushNotificationSink::with_config(tokens, Arc::new(provider), fast_retry_config());
676
677        let payload = json!({
678            "title": "Test",
679            "recipient_id": "user-A"
680        });
681
682        sink.deliver(payload, None, &HashMap::new()).await.unwrap();
683
684        // Should have been called twice (initial + 1 retry)
685        assert_eq!(state.call_count.load(Ordering::SeqCst), 2);
686    }
687
688    #[tokio::test]
689    async fn test_push_no_retry_on_permanent_error() {
690        let tokens = Arc::new(DeviceTokenStore::new());
691        tokens
692            .register("user-A", "token-1".to_string(), Platform::Ios)
693            .await;
694
695        let (provider, state) = MockPushProvider::new(vec![vec![PushResult::PermanentError(
696            "DeviceNotRegistered".to_string(),
697        )]]);
698
699        let sink = PushNotificationSink::with_config(
700            tokens.clone(),
701            Arc::new(provider),
702            fast_retry_config(),
703        );
704
705        let payload = json!({
706            "title": "Test",
707            "recipient_id": "user-A"
708        });
709
710        // Permanent errors are now handled gracefully: token cleaned up, no error
711        let result = sink.deliver(payload, None, &HashMap::new()).await;
712        assert!(result.is_ok());
713
714        // Should only have been called once (no retry)
715        assert_eq!(state.call_count.load(Ordering::SeqCst), 1);
716
717        // Stale token should have been cleaned up
718        assert_eq!(tokens.token_count("user-A").await, 0);
719    }
720
721    #[tokio::test]
722    async fn test_push_max_retries_exceeded() {
723        let tokens = Arc::new(DeviceTokenStore::new());
724        tokens
725            .register("user-A", "token-1".to_string(), Platform::Ios)
726            .await;
727
728        // Always returns retriable error
729        let (provider, state) = MockPushProvider::new(vec![
730            vec![PushResult::RetriableError("error 1".to_string())],
731            vec![PushResult::RetriableError("error 2".to_string())],
732            vec![PushResult::RetriableError("error 3".to_string())],
733            vec![PushResult::RetriableError("error 4".to_string())],
734        ]);
735
736        let sink =
737            PushNotificationSink::with_config(tokens, Arc::new(provider), fast_retry_config());
738
739        let payload = json!({
740            "title": "Test",
741            "recipient_id": "user-A"
742        });
743
744        let result = sink.deliver(payload, None, &HashMap::new()).await;
745        assert!(result.is_err());
746        assert!(result.unwrap_err().to_string().contains("after 3 retries"));
747
748        // 1 initial + 3 retries = 4 calls
749        assert_eq!(state.call_count.load(Ordering::SeqCst), 4);
750    }
751
752    #[tokio::test]
753    async fn test_push_explicit_recipient_overrides_payload() {
754        let tokens = Arc::new(DeviceTokenStore::new());
755        tokens
756            .register("user-B", "token-B".to_string(), Platform::Ios)
757            .await;
758
759        let (provider, state) = MockPushProvider::always_success();
760        let sink = PushNotificationSink::with_provider(tokens, Arc::new(provider));
761
762        // Payload says user-A, but explicit param says user-B
763        let payload = json!({
764            "title": "Test",
765            "recipient_id": "user-A"
766        });
767
768        sink.deliver(payload, Some("user-B"), &HashMap::new())
769            .await
770            .unwrap();
771
772        let calls = state.received.lock().await;
773        assert_eq!(calls[0][0].to, "token-B");
774    }
775
776    #[tokio::test]
777    async fn test_push_message_serialization() {
778        let msg = PushMessage {
779            to: "ExponentPushToken[abc]".to_string(),
780            title: "Hello".to_string(),
781            body: "World".to_string(),
782            data: json!({"screen": "home"}),
783            sound: "default".to_string(),
784        };
785
786        let json = serde_json::to_value(&msg).unwrap();
787        assert_eq!(json["to"], "ExponentPushToken[abc]");
788        assert_eq!(json["title"], "Hello");
789        assert_eq!(json["body"], "World");
790        assert_eq!(json["data"]["screen"], "home");
791        assert_eq!(json["sound"], "default");
792    }
793
794    #[tokio::test]
795    async fn test_push_message_null_data_omitted() {
796        let msg = PushMessage {
797            to: "token".to_string(),
798            title: "Test".to_string(),
799            body: "Body".to_string(),
800            data: Value::Null,
801            sound: "default".to_string(),
802        };
803
804        let json = serde_json::to_value(&msg).unwrap();
805        assert!(!json.as_object().unwrap().contains_key("data"));
806    }
807
808    #[test]
809    fn test_sink_name_and_type() {
810        let tokens = Arc::new(DeviceTokenStore::new());
811        let (provider, _state) = MockPushProvider::always_success();
812        let sink = PushNotificationSink::with_provider(tokens, Arc::new(provider));
813        assert_eq!(sink.name(), "push");
814        assert_eq!(sink.sink_type(), SinkType::Push);
815    }
816
817    // ── Preferences integration tests ─────────────────────────────
818
819    #[tokio::test]
820    async fn test_push_with_preferences_disabled_type_skipped() {
821        let tokens = Arc::new(DeviceTokenStore::new());
822        tokens
823            .register("user-A", "token-1".to_string(), Platform::Ios)
824            .await;
825
826        let prefs = Arc::new(NotificationPreferencesStore::new());
827        prefs.disable_type("user-A", "new_like").await;
828
829        let (provider, state) = MockPushProvider::always_success();
830        let sink =
831            PushNotificationSink::with_provider(tokens, Arc::new(provider)).with_preferences(prefs);
832
833        // Deliver a disabled type — should be skipped
834        let payload = json!({
835            "title": "New like",
836            "notification_type": "new_like",
837            "recipient_id": "user-A"
838        });
839        sink.deliver(payload, None, &HashMap::new()).await.unwrap();
840        assert_eq!(state.call_count.load(Ordering::SeqCst), 0);
841
842        // Deliver an enabled type — should send
843        let payload = json!({
844            "title": "New follower",
845            "notification_type": "new_follower",
846            "recipient_id": "user-A"
847        });
848        sink.deliver(payload, None, &HashMap::new()).await.unwrap();
849        assert_eq!(state.call_count.load(Ordering::SeqCst), 1);
850    }
851
852    #[tokio::test]
853    async fn test_push_with_preferences_muted_user_skipped() {
854        let tokens = Arc::new(DeviceTokenStore::new());
855        tokens
856            .register("user-A", "token-1".to_string(), Platform::Ios)
857            .await;
858
859        let prefs = Arc::new(NotificationPreferencesStore::new());
860        prefs.mute("user-A").await;
861
862        let (provider, state) = MockPushProvider::always_success();
863        let sink =
864            PushNotificationSink::with_provider(tokens, Arc::new(provider)).with_preferences(prefs);
865
866        let payload = json!({
867            "title": "Test",
868            "notification_type": "new_follower",
869            "recipient_id": "user-A"
870        });
871        sink.deliver(payload, None, &HashMap::new()).await.unwrap();
872        assert_eq!(state.call_count.load(Ordering::SeqCst), 0);
873    }
874
875    #[tokio::test]
876    async fn test_push_without_preferences_delivers_all() {
877        let tokens = Arc::new(DeviceTokenStore::new());
878        tokens
879            .register("user-A", "token-1".to_string(), Platform::Ios)
880            .await;
881
882        let (provider, state) = MockPushProvider::always_success();
883        // No preferences store
884        let sink = PushNotificationSink::with_provider(tokens, Arc::new(provider));
885
886        let payload = json!({
887            "title": "Test",
888            "notification_type": "new_like",
889            "recipient_id": "user-A"
890        });
891        sink.deliver(payload, None, &HashMap::new()).await.unwrap();
892        assert_eq!(state.call_count.load(Ordering::SeqCst), 1);
893    }
894
895    // ── Stale token cleanup tests ─────────────────────────────────
896
897    #[tokio::test]
898    async fn test_push_permanent_error_unregisters_stale_token() {
899        let tokens = Arc::new(DeviceTokenStore::new());
900        tokens
901            .register("user-A", "good-token".to_string(), Platform::Ios)
902            .await;
903        tokens
904            .register("user-A", "stale-token".to_string(), Platform::Android)
905            .await;
906        assert_eq!(tokens.token_count("user-A").await, 2);
907
908        // First token succeeds, second gets DeviceNotRegistered
909        let (provider, _state) = MockPushProvider::new(vec![vec![
910            PushResult::Success,
911            PushResult::PermanentError("DeviceNotRegistered".to_string()),
912        ]]);
913
914        let sink = PushNotificationSink::with_config(
915            tokens.clone(),
916            Arc::new(provider),
917            fast_retry_config(),
918        );
919
920        let payload = json!({
921            "title": "Test",
922            "recipient_id": "user-A"
923        });
924
925        // Should succeed (stale token cleaned up silently)
926        sink.deliver(payload, None, &HashMap::new()).await.unwrap();
927
928        // Stale token should be unregistered
929        assert_eq!(tokens.token_count("user-A").await, 1);
930        let remaining = tokens.get_tokens("user-A").await;
931        assert_eq!(remaining[0].token, "good-token");
932    }
933
934    #[tokio::test]
935    async fn test_push_all_tokens_permanent_error_cleans_all() {
936        let tokens = Arc::new(DeviceTokenStore::new());
937        tokens
938            .register("user-A", "dead-1".to_string(), Platform::Ios)
939            .await;
940        tokens
941            .register("user-A", "dead-2".to_string(), Platform::Android)
942            .await;
943
944        let (provider, _state) = MockPushProvider::new(vec![vec![
945            PushResult::PermanentError("DeviceNotRegistered".to_string()),
946            PushResult::PermanentError("DeviceNotRegistered".to_string()),
947        ]]);
948
949        let sink = PushNotificationSink::with_config(
950            tokens.clone(),
951            Arc::new(provider),
952            fast_retry_config(),
953        );
954
955        let payload = json!({
956            "title": "Test",
957            "recipient_id": "user-A"
958        });
959
960        sink.deliver(payload, None, &HashMap::new()).await.unwrap();
961
962        // All tokens cleaned up
963        assert_eq!(tokens.token_count("user-A").await, 0);
964    }
965}