1use crate::config::sinks::SinkType;
19use crate::events::sinks::Sink;
20use crate::events::sinks::device_tokens::DeviceTokenStore;
21use crate::events::sinks::preferences::NotificationPreferencesStore;
22use anyhow::{Result, anyhow};
23use async_trait::async_trait;
24use serde::{Deserialize, Serialize};
25use serde_json::Value;
26use std::collections::HashMap;
27use std::sync::Arc;
28use std::time::Duration;
29
30#[cfg(feature = "push")]
31use reqwest;
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct PushMessage {
36 pub to: String,
38
39 pub title: String,
41
42 pub body: String,
44
45 #[serde(default, skip_serializing_if = "Value::is_null")]
47 pub data: Value,
48
49 #[serde(default = "default_sound")]
51 pub sound: String,
52}
53
54fn default_sound() -> String {
55 "default".to_string()
56}
57
58#[derive(Debug, Clone)]
60pub enum PushResult {
61 Success,
63 RetriableError(String),
65 PermanentError(String),
67}
68
69#[async_trait]
74pub trait PushProvider: Send + Sync + std::fmt::Debug {
75 async fn send_batch(&self, messages: Vec<PushMessage>) -> Vec<PushResult>;
79
80 fn name(&self) -> &str;
82}
83
84#[cfg(feature = "push")]
91#[derive(Debug)]
92pub struct ExpoPushProvider {
93 client: reqwest::Client,
94 api_url: String,
95}
96
97#[cfg(feature = "push")]
98impl ExpoPushProvider {
99 pub fn new() -> Self {
101 Self {
102 client: reqwest::Client::new(),
103 api_url: "https://exp.host/--/api/v2/push/send".to_string(),
104 }
105 }
106
107 pub fn with_url(url: String) -> Self {
109 Self {
110 client: reqwest::Client::new(),
111 api_url: url,
112 }
113 }
114}
115
116#[cfg(feature = "push")]
117impl Default for ExpoPushProvider {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123#[cfg(feature = "push")]
124#[async_trait]
125impl PushProvider for ExpoPushProvider {
126 async fn send_batch(&self, messages: Vec<PushMessage>) -> Vec<PushResult> {
127 if messages.is_empty() {
128 return Vec::new();
129 }
130
131 let response = self.client.post(&self.api_url).json(&messages).send().await;
133
134 match response {
135 Ok(resp) => {
136 let status = resp.status();
137 if status.is_success() {
138 match resp.json::<ExpoResponse>().await {
140 Ok(expo_resp) => expo_resp
141 .data
142 .into_iter()
143 .map(|ticket| match ticket.status.as_str() {
144 "ok" => PushResult::Success,
145 "error" => {
146 let msg = ticket
147 .message
148 .unwrap_or_else(|| "unknown error".to_string());
149 if ticket.details.as_ref().is_some_and(|d| {
151 d.get("error")
152 .and_then(|e| e.as_str())
153 .is_some_and(|e| e == "DeviceNotRegistered")
154 }) {
155 PushResult::PermanentError(msg)
156 } else {
157 PushResult::RetriableError(msg)
158 }
159 }
160 _ => PushResult::RetriableError(format!(
161 "unexpected status: {}",
162 ticket.status
163 )),
164 })
165 .collect(),
166 Err(e) => {
167 vec![
169 PushResult::RetriableError(format!(
170 "failed to parse Expo response: {}",
171 e
172 ));
173 messages.len()
174 ]
175 }
176 }
177 } else if status.is_server_error() {
178 vec![
179 PushResult::RetriableError(format!("server error: {}", status));
180 messages.len()
181 ]
182 } else {
183 let body = resp.text().await.unwrap_or_default();
185 vec![
186 PushResult::PermanentError(format!("client error {}: {}", status, body));
187 messages.len()
188 ]
189 }
190 }
191 Err(e) => {
192 vec![PushResult::RetriableError(format!("network error: {}", e)); messages.len()]
194 }
195 }
196 }
197
198 fn name(&self) -> &str {
199 "expo"
200 }
201}
202
203#[cfg(feature = "push")]
205#[derive(Debug, Deserialize)]
206struct ExpoResponse {
207 data: Vec<ExpoTicket>,
208}
209
210#[cfg(feature = "push")]
212#[derive(Debug, Deserialize)]
213struct ExpoTicket {
214 status: String,
215 #[serde(default)]
216 message: Option<String>,
217 #[serde(default)]
218 details: Option<Value>,
219}
220
221#[derive(Debug, Clone)]
223pub struct RetryConfig {
224 pub max_retries: u32,
226 pub backoff: Vec<Duration>,
228}
229
230impl Default for RetryConfig {
231 fn default() -> Self {
232 Self {
233 max_retries: 3,
234 backoff: vec![
235 Duration::from_millis(100),
236 Duration::from_millis(500),
237 Duration::from_secs(2),
238 ],
239 }
240 }
241}
242
243#[derive(Debug)]
259pub struct PushNotificationSink {
260 device_tokens: Arc<DeviceTokenStore>,
262
263 provider: Arc<dyn PushProvider>,
265
266 retry_config: RetryConfig,
268
269 preferences: Option<Arc<NotificationPreferencesStore>>,
271}
272
273impl PushNotificationSink {
274 #[cfg(feature = "push")]
278 pub fn new(device_tokens: Arc<DeviceTokenStore>) -> Self {
279 Self {
280 device_tokens,
281 provider: Arc::new(ExpoPushProvider::new()),
282 retry_config: RetryConfig::default(),
283 preferences: None,
284 }
285 }
286
287 pub fn with_provider(
289 device_tokens: Arc<DeviceTokenStore>,
290 provider: Arc<dyn PushProvider>,
291 ) -> Self {
292 Self {
293 device_tokens,
294 provider,
295 retry_config: RetryConfig::default(),
296 preferences: None,
297 }
298 }
299
300 pub fn with_config(
302 device_tokens: Arc<DeviceTokenStore>,
303 provider: Arc<dyn PushProvider>,
304 retry_config: RetryConfig,
305 ) -> Self {
306 Self {
307 device_tokens,
308 provider,
309 retry_config,
310 preferences: None,
311 }
312 }
313
314 pub fn with_preferences(mut self, preferences: Arc<NotificationPreferencesStore>) -> Self {
319 self.preferences = Some(preferences);
320 self
321 }
322
323 async fn send_with_retry(&self, messages: Vec<PushMessage>) -> Result<Vec<String>> {
328 let mut pending = messages;
329 let mut attempt = 0;
330 let mut permanently_failed_tokens: Vec<String> = Vec::new();
331
332 loop {
333 let results = self.provider.send_batch(pending.clone()).await;
334
335 let mut failed: Vec<PushMessage> = Vec::new();
336 let mut permanent_errors: Vec<String> = Vec::new();
337
338 for (msg, result) in pending.iter().zip(results.iter()) {
339 match result {
340 PushResult::Success => {}
341 PushResult::RetriableError(err) => {
342 tracing::warn!(
343 token = %msg.to,
344 error = %err,
345 attempt = attempt + 1,
346 "push: retriable error"
347 );
348 failed.push(msg.clone());
349 }
350 PushResult::PermanentError(err) => {
351 tracing::error!(
352 token = %msg.to,
353 error = %err,
354 "push: permanent error (will not retry)"
355 );
356 permanently_failed_tokens.push(msg.to.clone());
357 permanent_errors.push(err.clone());
358 }
359 }
360 }
361
362 if failed.is_empty() {
363 if permanent_errors.is_empty() {
364 return Ok(permanently_failed_tokens);
365 } else {
366 return Ok(permanently_failed_tokens);
369 }
370 }
371
372 attempt += 1;
373 if attempt > self.retry_config.max_retries {
374 return Err(anyhow!(
375 "push: {} message(s) failed after {} retries",
376 failed.len(),
377 self.retry_config.max_retries
378 ));
379 }
380
381 let backoff_idx = (attempt as usize - 1).min(self.retry_config.backoff.len() - 1);
383 let delay = self.retry_config.backoff[backoff_idx];
384 tracing::debug!(
385 attempt = attempt,
386 delay_ms = delay.as_millis(),
387 remaining = failed.len(),
388 "push: retrying after backoff"
389 );
390 tokio::time::sleep(delay).await;
391
392 pending = failed;
393 }
394 }
395}
396
397#[async_trait]
398impl Sink for PushNotificationSink {
399 async fn deliver(
400 &self,
401 payload: Value,
402 recipient_id: Option<&str>,
403 context_vars: &HashMap<String, Value>,
404 ) -> Result<()> {
405 let recipient = super::resolve_recipient(recipient_id, &payload, context_vars)
407 .ok_or_else(|| anyhow!("push sink: recipient_id not found"))?;
408
409 if let Some(prefs_store) = &self.preferences {
411 let notification_type = payload
412 .get("notification_type")
413 .and_then(|v| v.as_str())
414 .unwrap_or("generic");
415
416 if !prefs_store.is_enabled(&recipient, notification_type).await {
417 tracing::debug!(
418 recipient = %recipient,
419 notification_type = %notification_type,
420 "push sink: notification type disabled by user preferences, skipping"
421 );
422 return Ok(());
423 }
424 }
425
426 let tokens = self.device_tokens.get_tokens(&recipient).await;
428 if tokens.is_empty() {
429 tracing::debug!(
430 recipient = %recipient,
431 "push sink: no device tokens registered, skipping"
432 );
433 return Ok(());
434 }
435
436 let title = payload
438 .get("title")
439 .and_then(|v| v.as_str())
440 .unwrap_or("Notification")
441 .to_string();
442
443 let body = payload
444 .get("body")
445 .and_then(|v| v.as_str())
446 .unwrap_or("")
447 .to_string();
448
449 let data = payload.get("data").cloned().unwrap_or(Value::Null);
450
451 let messages: Vec<PushMessage> = tokens
453 .into_iter()
454 .map(|dt| PushMessage {
455 to: dt.token,
456 title: title.clone(),
457 body: body.clone(),
458 data: data.clone(),
459 sound: "default".to_string(),
460 })
461 .collect();
462
463 tracing::debug!(
464 recipient = %recipient,
465 token_count = messages.len(),
466 provider = self.provider.name(),
467 "push sink: sending notifications"
468 );
469
470 let stale_tokens = self.send_with_retry(messages).await?;
472
473 for token in &stale_tokens {
475 tracing::info!(
476 recipient = %recipient,
477 token = %token,
478 "push sink: unregistering stale device token"
479 );
480 self.device_tokens.unregister(&recipient, token).await;
481 }
482
483 Ok(())
484 }
485
486 fn name(&self) -> &str {
487 "push"
488 }
489
490 fn sink_type(&self) -> SinkType {
491 SinkType::Push
492 }
493}
494
495#[cfg(test)]
496mod tests {
497 use super::*;
498 use crate::events::sinks::device_tokens::Platform;
499 use serde_json::json;
500 use std::sync::atomic::{AtomicUsize, Ordering};
501
502 #[derive(Debug, Clone)]
506 struct MockState {
507 results: Arc<tokio::sync::Mutex<Vec<Vec<PushResult>>>>,
508 call_count: Arc<AtomicUsize>,
509 received: Arc<tokio::sync::Mutex<Vec<Vec<PushMessage>>>>,
510 }
511
512 #[derive(Debug)]
514 struct MockPushProvider {
515 state: MockState,
516 }
517
518 impl MockPushProvider {
519 fn new(results: Vec<Vec<PushResult>>) -> (Self, MockState) {
520 let state = MockState {
521 results: Arc::new(tokio::sync::Mutex::new(results)),
522 call_count: Arc::new(AtomicUsize::new(0)),
523 received: Arc::new(tokio::sync::Mutex::new(Vec::new())),
524 };
525 (
526 Self {
527 state: state.clone(),
528 },
529 state,
530 )
531 }
532
533 fn always_success() -> (Self, MockState) {
535 Self::new(Vec::new())
536 }
537 }
538
539 #[async_trait]
540 impl PushProvider for MockPushProvider {
541 async fn send_batch(&self, messages: Vec<PushMessage>) -> Vec<PushResult> {
542 let call_idx = self.state.call_count.fetch_add(1, Ordering::SeqCst);
543 self.state.received.lock().await.push(messages.clone());
544
545 let mut results = self.state.results.lock().await;
546 if call_idx < results.len() {
547 results[call_idx].drain(..).collect()
548 } else {
549 vec![PushResult::Success; messages.len()]
551 }
552 }
553
554 fn name(&self) -> &str {
555 "mock"
556 }
557 }
558
559 fn fast_retry_config() -> RetryConfig {
560 RetryConfig {
561 max_retries: 3,
562 backoff: vec![
563 Duration::from_millis(1),
564 Duration::from_millis(1),
565 Duration::from_millis(1),
566 ],
567 }
568 }
569
570 #[tokio::test]
573 async fn test_push_deliver_success() {
574 let tokens = Arc::new(DeviceTokenStore::new());
575 tokens
576 .register(
577 "user-A",
578 "ExponentPushToken[abc]".to_string(),
579 Platform::Ios,
580 )
581 .await;
582
583 let (provider, state) = MockPushProvider::always_success();
584 let sink = PushNotificationSink::with_provider(tokens, Arc::new(provider));
585
586 let payload = json!({
587 "title": "New follower",
588 "body": "Alice followed you",
589 "recipient_id": "user-A",
590 "data": {"screen": "profile"}
591 });
592
593 sink.deliver(payload, None, &HashMap::new()).await.unwrap();
594
595 let calls = state.received.lock().await;
596 assert_eq!(calls.len(), 1);
597 assert_eq!(calls[0].len(), 1);
598 assert_eq!(calls[0][0].to, "ExponentPushToken[abc]");
599 assert_eq!(calls[0][0].title, "New follower");
600 assert_eq!(calls[0][0].body, "Alice followed you");
601 assert_eq!(calls[0][0].data, json!({"screen": "profile"}));
602 }
603
604 #[tokio::test]
605 async fn test_push_deliver_multiple_tokens() {
606 let tokens = Arc::new(DeviceTokenStore::new());
607 tokens
608 .register("user-A", "token-1".to_string(), Platform::Ios)
609 .await;
610 tokens
611 .register("user-A", "token-2".to_string(), Platform::Android)
612 .await;
613
614 let (provider, state) = MockPushProvider::always_success();
615 let sink = PushNotificationSink::with_provider(tokens, Arc::new(provider));
616
617 let payload = json!({
618 "title": "Test",
619 "body": "Hello",
620 "recipient_id": "user-A"
621 });
622
623 sink.deliver(payload, None, &HashMap::new()).await.unwrap();
624
625 let calls = state.received.lock().await;
626 assert_eq!(calls[0].len(), 2);
627 assert_eq!(calls[0][0].to, "token-1");
628 assert_eq!(calls[0][1].to, "token-2");
629 }
630
631 #[tokio::test]
632 async fn test_push_deliver_no_tokens_skips() {
633 let tokens = Arc::new(DeviceTokenStore::new());
634 let (provider, state) = MockPushProvider::always_success();
635 let sink = PushNotificationSink::with_provider(tokens, Arc::new(provider));
636
637 let payload = json!({
638 "title": "Test",
639 "recipient_id": "user-A"
640 });
641
642 sink.deliver(payload, None, &HashMap::new()).await.unwrap();
644
645 assert_eq!(state.call_count.load(Ordering::SeqCst), 0);
647 }
648
649 #[tokio::test]
650 async fn test_push_deliver_no_recipient_error() {
651 let tokens = Arc::new(DeviceTokenStore::new());
652 let (provider, _state) = MockPushProvider::always_success();
653 let sink = PushNotificationSink::with_provider(tokens, Arc::new(provider));
654
655 let payload = json!({"title": "Test"});
656 let result = sink.deliver(payload, None, &HashMap::new()).await;
657 assert!(result.is_err());
658 assert!(result.unwrap_err().to_string().contains("recipient_id"));
659 }
660
661 #[tokio::test]
662 async fn test_push_retry_on_server_error() {
663 let tokens = Arc::new(DeviceTokenStore::new());
664 tokens
665 .register("user-A", "token-1".to_string(), Platform::Ios)
666 .await;
667
668 let (provider, state) = MockPushProvider::new(vec![
670 vec![PushResult::RetriableError("server error: 500".to_string())],
671 vec![PushResult::Success],
672 ]);
673
674 let sink =
675 PushNotificationSink::with_config(tokens, Arc::new(provider), fast_retry_config());
676
677 let payload = json!({
678 "title": "Test",
679 "recipient_id": "user-A"
680 });
681
682 sink.deliver(payload, None, &HashMap::new()).await.unwrap();
683
684 assert_eq!(state.call_count.load(Ordering::SeqCst), 2);
686 }
687
688 #[tokio::test]
689 async fn test_push_no_retry_on_permanent_error() {
690 let tokens = Arc::new(DeviceTokenStore::new());
691 tokens
692 .register("user-A", "token-1".to_string(), Platform::Ios)
693 .await;
694
695 let (provider, state) = MockPushProvider::new(vec![vec![PushResult::PermanentError(
696 "DeviceNotRegistered".to_string(),
697 )]]);
698
699 let sink = PushNotificationSink::with_config(
700 tokens.clone(),
701 Arc::new(provider),
702 fast_retry_config(),
703 );
704
705 let payload = json!({
706 "title": "Test",
707 "recipient_id": "user-A"
708 });
709
710 let result = sink.deliver(payload, None, &HashMap::new()).await;
712 assert!(result.is_ok());
713
714 assert_eq!(state.call_count.load(Ordering::SeqCst), 1);
716
717 assert_eq!(tokens.token_count("user-A").await, 0);
719 }
720
721 #[tokio::test]
722 async fn test_push_max_retries_exceeded() {
723 let tokens = Arc::new(DeviceTokenStore::new());
724 tokens
725 .register("user-A", "token-1".to_string(), Platform::Ios)
726 .await;
727
728 let (provider, state) = MockPushProvider::new(vec![
730 vec![PushResult::RetriableError("error 1".to_string())],
731 vec![PushResult::RetriableError("error 2".to_string())],
732 vec![PushResult::RetriableError("error 3".to_string())],
733 vec![PushResult::RetriableError("error 4".to_string())],
734 ]);
735
736 let sink =
737 PushNotificationSink::with_config(tokens, Arc::new(provider), fast_retry_config());
738
739 let payload = json!({
740 "title": "Test",
741 "recipient_id": "user-A"
742 });
743
744 let result = sink.deliver(payload, None, &HashMap::new()).await;
745 assert!(result.is_err());
746 assert!(result.unwrap_err().to_string().contains("after 3 retries"));
747
748 assert_eq!(state.call_count.load(Ordering::SeqCst), 4);
750 }
751
752 #[tokio::test]
753 async fn test_push_explicit_recipient_overrides_payload() {
754 let tokens = Arc::new(DeviceTokenStore::new());
755 tokens
756 .register("user-B", "token-B".to_string(), Platform::Ios)
757 .await;
758
759 let (provider, state) = MockPushProvider::always_success();
760 let sink = PushNotificationSink::with_provider(tokens, Arc::new(provider));
761
762 let payload = json!({
764 "title": "Test",
765 "recipient_id": "user-A"
766 });
767
768 sink.deliver(payload, Some("user-B"), &HashMap::new())
769 .await
770 .unwrap();
771
772 let calls = state.received.lock().await;
773 assert_eq!(calls[0][0].to, "token-B");
774 }
775
776 #[tokio::test]
777 async fn test_push_message_serialization() {
778 let msg = PushMessage {
779 to: "ExponentPushToken[abc]".to_string(),
780 title: "Hello".to_string(),
781 body: "World".to_string(),
782 data: json!({"screen": "home"}),
783 sound: "default".to_string(),
784 };
785
786 let json = serde_json::to_value(&msg).unwrap();
787 assert_eq!(json["to"], "ExponentPushToken[abc]");
788 assert_eq!(json["title"], "Hello");
789 assert_eq!(json["body"], "World");
790 assert_eq!(json["data"]["screen"], "home");
791 assert_eq!(json["sound"], "default");
792 }
793
794 #[tokio::test]
795 async fn test_push_message_null_data_omitted() {
796 let msg = PushMessage {
797 to: "token".to_string(),
798 title: "Test".to_string(),
799 body: "Body".to_string(),
800 data: Value::Null,
801 sound: "default".to_string(),
802 };
803
804 let json = serde_json::to_value(&msg).unwrap();
805 assert!(!json.as_object().unwrap().contains_key("data"));
806 }
807
808 #[test]
809 fn test_sink_name_and_type() {
810 let tokens = Arc::new(DeviceTokenStore::new());
811 let (provider, _state) = MockPushProvider::always_success();
812 let sink = PushNotificationSink::with_provider(tokens, Arc::new(provider));
813 assert_eq!(sink.name(), "push");
814 assert_eq!(sink.sink_type(), SinkType::Push);
815 }
816
817 #[tokio::test]
820 async fn test_push_with_preferences_disabled_type_skipped() {
821 let tokens = Arc::new(DeviceTokenStore::new());
822 tokens
823 .register("user-A", "token-1".to_string(), Platform::Ios)
824 .await;
825
826 let prefs = Arc::new(NotificationPreferencesStore::new());
827 prefs.disable_type("user-A", "new_like").await;
828
829 let (provider, state) = MockPushProvider::always_success();
830 let sink =
831 PushNotificationSink::with_provider(tokens, Arc::new(provider)).with_preferences(prefs);
832
833 let payload = json!({
835 "title": "New like",
836 "notification_type": "new_like",
837 "recipient_id": "user-A"
838 });
839 sink.deliver(payload, None, &HashMap::new()).await.unwrap();
840 assert_eq!(state.call_count.load(Ordering::SeqCst), 0);
841
842 let payload = json!({
844 "title": "New follower",
845 "notification_type": "new_follower",
846 "recipient_id": "user-A"
847 });
848 sink.deliver(payload, None, &HashMap::new()).await.unwrap();
849 assert_eq!(state.call_count.load(Ordering::SeqCst), 1);
850 }
851
852 #[tokio::test]
853 async fn test_push_with_preferences_muted_user_skipped() {
854 let tokens = Arc::new(DeviceTokenStore::new());
855 tokens
856 .register("user-A", "token-1".to_string(), Platform::Ios)
857 .await;
858
859 let prefs = Arc::new(NotificationPreferencesStore::new());
860 prefs.mute("user-A").await;
861
862 let (provider, state) = MockPushProvider::always_success();
863 let sink =
864 PushNotificationSink::with_provider(tokens, Arc::new(provider)).with_preferences(prefs);
865
866 let payload = json!({
867 "title": "Test",
868 "notification_type": "new_follower",
869 "recipient_id": "user-A"
870 });
871 sink.deliver(payload, None, &HashMap::new()).await.unwrap();
872 assert_eq!(state.call_count.load(Ordering::SeqCst), 0);
873 }
874
875 #[tokio::test]
876 async fn test_push_without_preferences_delivers_all() {
877 let tokens = Arc::new(DeviceTokenStore::new());
878 tokens
879 .register("user-A", "token-1".to_string(), Platform::Ios)
880 .await;
881
882 let (provider, state) = MockPushProvider::always_success();
883 let sink = PushNotificationSink::with_provider(tokens, Arc::new(provider));
885
886 let payload = json!({
887 "title": "Test",
888 "notification_type": "new_like",
889 "recipient_id": "user-A"
890 });
891 sink.deliver(payload, None, &HashMap::new()).await.unwrap();
892 assert_eq!(state.call_count.load(Ordering::SeqCst), 1);
893 }
894
895 #[tokio::test]
898 async fn test_push_permanent_error_unregisters_stale_token() {
899 let tokens = Arc::new(DeviceTokenStore::new());
900 tokens
901 .register("user-A", "good-token".to_string(), Platform::Ios)
902 .await;
903 tokens
904 .register("user-A", "stale-token".to_string(), Platform::Android)
905 .await;
906 assert_eq!(tokens.token_count("user-A").await, 2);
907
908 let (provider, _state) = MockPushProvider::new(vec![vec![
910 PushResult::Success,
911 PushResult::PermanentError("DeviceNotRegistered".to_string()),
912 ]]);
913
914 let sink = PushNotificationSink::with_config(
915 tokens.clone(),
916 Arc::new(provider),
917 fast_retry_config(),
918 );
919
920 let payload = json!({
921 "title": "Test",
922 "recipient_id": "user-A"
923 });
924
925 sink.deliver(payload, None, &HashMap::new()).await.unwrap();
927
928 assert_eq!(tokens.token_count("user-A").await, 1);
930 let remaining = tokens.get_tokens("user-A").await;
931 assert_eq!(remaining[0].token, "good-token");
932 }
933
934 #[tokio::test]
935 async fn test_push_all_tokens_permanent_error_cleans_all() {
936 let tokens = Arc::new(DeviceTokenStore::new());
937 tokens
938 .register("user-A", "dead-1".to_string(), Platform::Ios)
939 .await;
940 tokens
941 .register("user-A", "dead-2".to_string(), Platform::Android)
942 .await;
943
944 let (provider, _state) = MockPushProvider::new(vec![vec![
945 PushResult::PermanentError("DeviceNotRegistered".to_string()),
946 PushResult::PermanentError("DeviceNotRegistered".to_string()),
947 ]]);
948
949 let sink = PushNotificationSink::with_config(
950 tokens.clone(),
951 Arc::new(provider),
952 fast_retry_config(),
953 );
954
955 let payload = json!({
956 "title": "Test",
957 "recipient_id": "user-A"
958 });
959
960 sink.deliver(payload, None, &HashMap::new()).await.unwrap();
961
962 assert_eq!(tokens.token_count("user-A").await, 0);
964 }
965}