Skip to main content

plexus_comms/activations/push/
activation.rs

1use super::providers::PushRouter;
2use super::types::*;
3use crate::config::PushConfig;
4use async_stream::stream;
5use futures::Stream;
6use std::sync::Arc;
7
8// Required for macro-generated code
9use plexus_core::plexus;
10use plexus_core::serde_helpers;
11
12#[derive(Clone)]
13pub struct Push {
14    router: Arc<PushRouter>,
15}
16
17impl Push {
18    pub async fn new(config: PushConfig) -> Result<Self, String> {
19        let router = PushRouter::new(&config)?;
20
21        Ok(Self {
22            router: Arc::new(router),
23        })
24    }
25}
26
27#[plexus_macros::hub_methods(
28    namespace = "push",
29    version = "1.0.0",
30    description = "Send push notifications to iOS (APNs), Android (FCM), and Web"
31)]
32impl Push {
33    #[plexus_macros::hub_method(
34        description = "Send a push notification",
35        params(
36            device_token = "Device token",
37            platform = "Target platform (ios, android, web)",
38            title = "Notification title",
39            body = "Notification body",
40            data = "Custom data payload (optional)",
41            badge = "Badge count for iOS (optional)",
42            sound = "Sound file name (optional)"
43        )
44    )]
45    async fn send(
46        &self,
47        device_token: String,
48        platform: Platform,
49        title: String,
50        body: String,
51        data: Option<std::collections::HashMap<String, String>>,
52        badge: Option<i32>,
53        sound: Option<String>,
54    ) -> impl Stream<Item = SendPushEvent> + Send + 'static {
55        let router = self.router.clone();
56        let params = SendPushParams {
57            device_token,
58            platform,
59            title,
60            body,
61            data,
62            badge,
63            sound,
64        };
65
66        stream! {
67            match router.send(params).await {
68                Ok(event) => yield event,
69                Err(e) => yield SendPushEvent::Error {
70                    message: e,
71                    platform: None,
72                    code: None,
73                },
74            }
75        }
76    }
77
78    #[plexus_macros::hub_method(
79        streaming,
80        description = "Send multiple push notifications with progress tracking",
81        params(notifications = "List of push notifications to send")
82    )]
83    async fn send_batch(
84        &self,
85        notifications: Vec<SendPushParams>,
86    ) -> impl Stream<Item = BatchSendEvent> + Send + 'static {
87        let router = self.router.clone();
88        let total = notifications.len();
89
90        stream! {
91            let mut sent = 0;
92            let mut failed = 0;
93
94            for (index, notif) in notifications.into_iter().enumerate() {
95                let platform = notif.platform.clone();
96                match router.send(notif).await {
97                    Ok(SendPushEvent::Queued { message_id, .. }) |
98                    Ok(SendPushEvent::Sent { message_id, .. }) => {
99                        sent += 1;
100                        yield BatchSendEvent::NotificationSent { index, message_id, platform };
101                    }
102                    Ok(SendPushEvent::Error { message, .. }) | Err(message) => {
103                        failed += 1;
104                        yield BatchSendEvent::NotificationFailed { index, platform, error: message };
105                    }
106                }
107
108                if (index + 1) % 10 == 0 || index + 1 == total {
109                    yield BatchSendEvent::Progress {
110                        sent,
111                        total,
112                        percentage: ((sent + failed) as f32 / total as f32) * 100.0,
113                    };
114                }
115            }
116
117            yield BatchSendEvent::Complete {
118                total_sent: sent,
119                total_failed: failed,
120            };
121        }
122    }
123}