plexus_comms/activations/push/
activation.rs1use super::providers::PushRouter;
2use super::types::*;
3use crate::config::PushConfig;
4use async_stream::stream;
5use futures::Stream;
6use std::sync::Arc;
7
8use plexus_core::plexus;
10use plexus_core::serde_helpers;
11
12#[derive(Clone)]
13pub struct Push {
14 router: Arc<PushRouter>,
15}
16
17impl Push {
18 pub async fn new(config: PushConfig) -> Result<Self, String> {
19 let router = PushRouter::new(&config)?;
20
21 Ok(Self {
22 router: Arc::new(router),
23 })
24 }
25}
26
27#[plexus_macros::hub_methods(
28 namespace = "push",
29 version = "1.0.0",
30 description = "Send push notifications to iOS (APNs), Android (FCM), and Web"
31)]
32impl Push {
33 #[plexus_macros::hub_method(
34 description = "Send a push notification",
35 params(
36 device_token = "Device token",
37 platform = "Target platform (ios, android, web)",
38 title = "Notification title",
39 body = "Notification body",
40 data = "Custom data payload (optional)",
41 badge = "Badge count for iOS (optional)",
42 sound = "Sound file name (optional)"
43 )
44 )]
45 async fn send(
46 &self,
47 device_token: String,
48 platform: Platform,
49 title: String,
50 body: String,
51 data: Option<std::collections::HashMap<String, String>>,
52 badge: Option<i32>,
53 sound: Option<String>,
54 ) -> impl Stream<Item = SendPushEvent> + Send + 'static {
55 let router = self.router.clone();
56 let params = SendPushParams {
57 device_token,
58 platform,
59 title,
60 body,
61 data,
62 badge,
63 sound,
64 };
65
66 stream! {
67 match router.send(params).await {
68 Ok(event) => yield event,
69 Err(e) => yield SendPushEvent::Error {
70 message: e,
71 platform: None,
72 code: None,
73 },
74 }
75 }
76 }
77
78 #[plexus_macros::hub_method(
79 streaming,
80 description = "Send multiple push notifications with progress tracking",
81 params(notifications = "List of push notifications to send")
82 )]
83 async fn send_batch(
84 &self,
85 notifications: Vec<SendPushParams>,
86 ) -> impl Stream<Item = BatchSendEvent> + Send + 'static {
87 let router = self.router.clone();
88 let total = notifications.len();
89
90 stream! {
91 let mut sent = 0;
92 let mut failed = 0;
93
94 for (index, notif) in notifications.into_iter().enumerate() {
95 let platform = notif.platform.clone();
96 match router.send(notif).await {
97 Ok(SendPushEvent::Queued { message_id, .. }) |
98 Ok(SendPushEvent::Sent { message_id, .. }) => {
99 sent += 1;
100 yield BatchSendEvent::NotificationSent { index, message_id, platform };
101 }
102 Ok(SendPushEvent::Error { message, .. }) | Err(message) => {
103 failed += 1;
104 yield BatchSendEvent::NotificationFailed { index, platform, error: message };
105 }
106 }
107
108 if (index + 1) % 10 == 0 || index + 1 == total {
109 yield BatchSendEvent::Progress {
110 sent,
111 total,
112 percentage: ((sent + failed) as f32 / total as f32) * 100.0,
113 };
114 }
115 }
116
117 yield BatchSendEvent::Complete {
118 total_sent: sent,
119 total_failed: failed,
120 };
121 }
122 }
123}