1use ::http::StatusCode;
2use std::{collections::HashMap, error::Error as StdError};
3use thiserror::Error;
4
5use async_trait::async_trait;
6use flatten_json_object::{ArrayFormatting, Flattener};
7use serde::{Deserialize, Serialize};
8use serde_repr::{Deserialize_repr, Serialize_repr};
9use tokio::sync::RwLock;
10
11#[cfg(all(feature = "mysql", feature = "mongo"))]
12compile_error!("mysql and mongo not enable both.");
13
14mod utils;
15
16#[cfg(feature = "apns")]
17pub mod apns;
18#[cfg(feature = "email")]
19pub mod email;
20#[cfg(feature = "fcm")]
21pub mod fcm;
22#[cfg(feature = "huawei")]
23pub mod huawei;
24#[cfg(feature = "rtm")]
25pub mod rtm;
26#[cfg(feature = "wecom")]
27pub mod wecom;
28#[cfg(feature = "xiaomi")]
29pub mod xiaomi;
30
31#[cfg(feature = "grpc")]
32pub mod grpc;
33
34#[cfg(feature = "http")]
35pub mod http;
36
37#[cfg(all(any(feature = "http", feature = "grpc")))]
38pub mod service;
39
40pub(crate) type BoxError = Box<dyn StdError + Send + Sync>;
41
42pub enum Transparent<'a> {
43 Text(&'a str),
44 Json(serde_json::Map<String, serde_json::Value>),
45}
46
47#[derive(Debug, Clone, Copy)]
48pub enum Body<'a> {
49 Transparent(&'a str),
50 Notify { title: &'a str, body: &'a str },
51}
52
53#[derive(Debug, Clone)]
54pub struct ApnsExtra<'a> {
55 pub topic: &'a str,
56 pub push_type: apns::ApnsPushType,
57}
58
59#[derive(Debug, Clone)]
60pub enum WecomExtra<'a> {
61 Markdown(bool),
62 Text { url: &'a str, btntxt: &'a str },
63}
64
65#[derive(Debug, Default, Serialize)]
66pub struct PushResult {
67 pub request_id: String,
68 pub code: String,
69 pub reason: String,
70 pub success: i64,
71 pub failure: i64,
72 pub invalid_tokens: Vec<String>,
73}
74
75#[derive(Debug, Default, Serialize)]
76pub struct PushResults {
77 pub success: i64,
78 pub failure: i64,
79 pub results: Vec<PushResult>,
80}
81
82#[derive(Debug, Clone)]
83pub struct Message<'a> {
84 pub tokens: &'a [&'a str],
85 pub body: Body<'a>,
86 pub android: Option<AndroidExtra<'a>>,
87 pub apns: Option<ApnsExtra<'a>>,
88 pub wecom: Option<WecomExtra<'a>>,
89}
90
91#[derive(Debug, Error)]
92#[error(transparent)]
93pub struct Error {
94 inner: InnerError,
95}
96
97#[derive(Debug, Error)]
98pub enum InnerError {
99 #[error("new client error: `{0}`")]
100 Client(String),
101 #[error("http error: `{0}`")]
102 Http(String),
103 #[error("format error: `{0}`")]
104 Format(String),
105 #[error("response error: `{0}`")]
106 Response(String),
107 #[error("biz error: `{0}`")]
108 Biz(String),
109 #[error("retryable error : `{0}`")]
110 RetryError(RetryError),
111 #[error("missing required parameter: `{0}`")]
112 MissingRequired(String),
113 #[error("invalid parameter: `{0}`")]
114 InvalidParams(String),
115 #[error("no exist client")]
116 NoExistClient,
117 #[error("serde error: `{0}`")]
118 Serde(BoxError),
119 #[error("message limit")]
120 MessageLimit,
121 #[error("token limit")]
122 TokenLimit,
123 #[error("unknown error: `{0}`")]
124 Unknown(String),
125 #[error("auth error: `{0}`")]
126 Auth(String),
127 #[error("UnsupportedType error: `{0}`")]
128 UnsupportedType(String),
129}
130
131impl From<serde_json::Error> for Error {
132 fn from(e: serde_json::Error) -> Self {
133 Self {
134 inner: InnerError::Serde(Box::new(e)),
135 }
136 }
137}
138
139impl From<reqwest::Error> for Error {
140 fn from(e: reqwest::Error) -> Self {
141 let inner = if e.is_builder() {
142 InnerError::Client(e.to_string())
143 } else if e.is_status() {
144 match e.status().unwrap() {
145 StatusCode::BAD_REQUEST => InnerError::MissingRequired(e.to_string()),
146 StatusCode::SERVICE_UNAVAILABLE => {
147 InnerError::RetryError(RetryError::Server(e.to_string()))
148 }
149 StatusCode::GATEWAY_TIMEOUT => {
150 InnerError::RetryError(RetryError::Timeout(e.to_string()))
151 }
152 StatusCode::REQUEST_TIMEOUT => {
153 InnerError::RetryError(RetryError::Timeout(e.to_string()))
154 }
155 StatusCode::UNAUTHORIZED => InnerError::RetryError(RetryError::Auth(e.to_string())),
156 _ => InnerError::Unknown(e.to_string()),
157 }
158 } else if e.is_timeout() {
159 InnerError::RetryError(RetryError::Timeout(e.to_string()))
160 } else if e.is_body() {
161 InnerError::Format(e.to_string())
162 } else if e.is_decode() {
163 InnerError::Serde(Box::new(e))
164 } else {
165 InnerError::Unknown(e.to_string())
166 };
167 Self { inner }
168 }
169}
170
171#[derive(Debug, Eq, PartialEq, Error)]
172pub enum RetryError {
173 #[error("auth error: `{0}`")]
174 Auth(String),
175 #[error("server error: `{0}`")]
176 Server(String),
177 #[error("timeout error: `{0}`")]
178 Timeout(String),
179 #[error("qps limiter")]
180 QPS,
181}
182
183impl From<RetryError> for Error {
184 fn from(e: RetryError) -> Self {
185 Self {
186 inner: InnerError::RetryError(e),
187 }
188 }
189}
190
191impl From<InnerError> for Error {
192 fn from(e: InnerError) -> Self {
193 Self { inner: e }
194 }
195}
196
197#[derive(Debug)]
198pub enum ErrorKind {
199 Http,
200 Format,
201 Auth,
202 Response,
203 Biz,
204}
205
206#[async_trait]
215pub trait Pusher<'b, M, R>
216where
217 M: Sync,
218 R: Send,
219{
220 const TOKEN_LIMIT: usize = 500;
221
222 async fn push(&self, msg: &'b M) -> Result<R, Error>;
223
224 async fn retry_push(&self, msg: &'b M) -> Result<R, Error> {
225 tokio_retry::RetryIf::spawn(
226 tokio_retry::strategy::ExponentialBackoff::from_millis(2)
227 .factor(1000)
228 .take(3),
229 || self.push(msg),
230 |e: &Error| match e.inner {
231 InnerError::RetryError(_) => true,
232 _ => false,
233 },
234 )
235 .await
236 }
237
238 async fn retry_batch_push(&self, msgs: &'b [M]) -> Result<Vec<R>, Error> {
239 let mut resps = Vec::new();
240 for msg in msgs {
241 let resp = self.retry_push(msg).await?;
242 resps.push(resp);
243 }
244 Ok(resps)
245 }
246}
247
248pub trait FromMessage<'a>: TryFrom<Message<'a>> {}
249
250impl<'a> TryFrom<Message<'a>> for xiaomi::Message<'a> {
251 type Error = Error;
252
253 fn try_from(msg: Message<'a>) -> Result<Self, Self::Error> {
254 let extra = msg.android.as_ref();
255 Ok(match msg.body {
256 Body::Transparent(data) => xiaomi::Message {
257 payload: Some(data),
258 pass_through: xiaomi::Passtrough::Transparent,
259 registration_id: Some(msg.tokens.join(",")),
260 ..Default::default()
261 },
262 Body::Notify { title, body, .. } => xiaomi::Message {
263 title,
264 description: body,
265 registration_id: Some(msg.tokens.join(",")),
266 pass_through: xiaomi::Passtrough::Notice,
267 restricted_package_name: extra.map_or("", |v| v.package_name),
268 extra: xiaomi::Extra {
269 notify_foreground: extra.map_or(None, |v| v.foreground_show.map(|v| v.into())),
270 notify_effect: extra.map_or(None, |v| {
271 if !v.click_action.is_empty() {
272 Some(xiaomi::NotifyEffect::Intent)
273 } else {
274 None
275 }
276 }),
277 intent_uri: extra.map_or(None, |v| {
278 if !v.click_action.is_empty() {
279 Some(v.click_action)
280 } else {
281 None
282 }
283 }),
284 job_key: extra.map_or(None, |v| {
285 if !v.click_action.is_empty() {
286 Some(v.click_action)
287 } else {
288 None
289 }
290 }),
291 ..Default::default()
292 }
293 .into(),
294 ..Default::default()
295 },
296 })
297 }
298}
299
300impl<'a> FromMessage<'a> for xiaomi::Message<'a> {}
301
302impl<'a> FromMessage<'a> for Vec<xiaomi::Message<'a>> {}
303
304impl<'a> TryFrom<Message<'a>> for Vec<xiaomi::Message<'a>> {
305 type Error = Error;
306
307 fn try_from(msg: Message<'a>) -> Result<Self, Self::Error> {
308 let tokens = msg.tokens;
309
310 let mi_msg: xiaomi::Message = msg.try_into()?;
311
312 let mut msgs = Vec::new();
313 for i in 0..(tokens.len() / 1000) {
314 let mut mi_tokens = Vec::new();
315
316 let mut msg = mi_msg.clone();
317 let start_index = i * 1000;
318 let end_index = if tokens.len() - start_index >= 1000 {
319 1000
320 } else {
321 tokens.len()
322 };
323 for j in start_index..end_index {
324 mi_tokens.push(tokens[i * 1000 + j]);
325 }
326 msg.registration_id = Some(mi_tokens.join(","));
327 msgs.push(msg);
328 }
329
330 Ok(msgs)
331 }
332}
333
334impl<'a> FromMessage<'a> for huawei::Message<'a> {}
335
336impl<'a> TryFrom<Message<'a>> for huawei::Message<'a> {
337 type Error = Error;
338
339 fn try_from(msg: Message<'a>) -> Result<Self, Self::Error> {
340 let extra = msg.android.as_ref();
341
342 match msg.body {
343 Body::Transparent(data) => Ok(huawei::Message {
344 validate_only: false,
345 message: huawei::InnerMessage {
346 data: Some(data),
347 android: Some(huawei::AndroidConfig {
348 ..Default::default()
349 }),
350 token: msg.tokens.to_vec(),
351 ..Default::default()
352 },
353 }),
354 Body::Notify { title, body } => Ok(huawei::Message {
355 validate_only: false,
356 message: huawei::InnerMessage {
357 token: msg.tokens.to_vec(),
358 notification: Some(huawei::Notification {
359 title,
360 body,
361 ..Default::default()
362 }),
363 android: Some(huawei::AndroidConfig {
364 collapse_key: extra.map_or(None, |e| e.collapse_key),
365 urgency: extra.map_or(None, |e| {
366 e.priority.as_ref().map_or(None, |e| match e {
367 Priority::High => Some(huawei::Urgency::High),
368 Priority::Normal => Some(huawei::Urgency::Normal),
369 })
370 }),
371 notification: Some(huawei::AndroidNotification {
373 sound: extra.map_or(None, |e| e.sound),
374 icon: extra.map_or(None, |e| e.icon),
375 image: extra.map_or(None, |e| Some(e.image)),
376 tag: extra.map_or(None, |e| e.tag),
377 body_loc_key: extra.map_or(None, |e| Some(e.body_loc_key)),
378 body_loc_args: extra.map_or(Default::default(), |e| e.body_loc_args),
379 title_loc_key: extra
380 .map_or(Default::default(), |e| Some(e.title_loc_key)),
381 title_loc_args: extra.map_or(Default::default(), |e| e.title_loc_args),
382 channel_id: extra.map_or(None, |e| Some(e.channel_id)),
383 ticker: extra.map_or(None, |e| Some(e.ticker)),
384 click_action: extra
385 .map(|e| huawei::ClickAction::new_intent(e.click_action))
386 .ok_or(InnerError::MissingRequired(
387 "missing click action".to_string(),
388 ))?,
389 visibility: extra.map_or(None, |e| match e.visibility {
390 Visibility::Unspecified => Some(""),
391 Visibility::Private => Some("PRIVATE"),
392 Visibility::Public => Some("PUBLIC"),
393 Visibility::Secret => Some("SECRET"),
394 }),
395 auto_clear: extra.map_or(None, |e| e.auto_clear),
396 foreground_show: extra.map_or(None, |e| e.foreground_show),
397 ..Default::default()
398 }),
399 ..Default::default()
400 }),
401 data: None,
402 },
403 }),
404 }
405 }
406}
407
408impl<'a> FromMessage<'a> for Vec<huawei::Message<'a>> {}
409
410impl<'a> TryFrom<Message<'a>> for Vec<huawei::Message<'a>> {
411 type Error = Error;
412
413 fn try_from(msg: Message<'a>) -> Result<Self, Self::Error> {
414 let tokens = msg.tokens;
415
416 let hw_msg: huawei::Message = msg.try_into()?;
417
418 let mut msgs = Vec::new();
419
420 for i in 0..(tokens.len() / 1000) + 1 {
421 let mut hw_tokens = Vec::new();
422
423 let mut msg = hw_msg.clone();
424
425 let start_index = i * 1000;
426 let end_index = if tokens.len() - start_index >= 1000 {
427 1000
428 } else {
429 tokens.len()
430 };
431 for j in start_index..end_index {
432 hw_tokens.push(tokens[j]);
433 }
434 msg.message.token = hw_tokens;
435 msgs.push(msg);
436 }
437
438 Ok(msgs)
439 }
440}
441
442impl<'a> FromMessage<'a> for fcm::MulticastMessage<'a> {}
443
444impl<'a> TryFrom<Message<'a>> for fcm::MulticastMessage<'a> {
445 type Error = Error;
446
447 fn try_from(msg: Message<'a>) -> Result<Self, Self::Error> {
448 let extra = msg.android.as_ref();
449
450 match msg.body {
451 Body::Transparent(data) => {
452 let data = if let Ok(val) = serde_json::from_str::<serde_json::Value>(data) {
453 let val = Flattener::new()
454 .set_key_separator(".")
455 .set_array_formatting(ArrayFormatting::Plain)
456 .set_preserve_empty_arrays(false)
457 .set_preserve_empty_objects(false)
458 .flatten(&val)
459 .unwrap();
460 serde_json::from_value::<HashMap<String, String>>(val)?
461 } else {
462 HashMap::from_iter([("text".to_string(), data.to_string())])
463 };
464 Ok(fcm::MulticastMessage {
465 data: Some(data),
466 tokens: msg.tokens.to_vec(),
467 ..Default::default()
468 })
469 }
470 Body::Notify { title, body } => Ok(fcm::MulticastMessage {
471 notification: Some(fcm::Notification {
472 body: Some(body.to_string()),
473 title: Some(title.to_string()),
474 ..Default::default()
475 }),
476 android: Some(fcm::AndroidConfig {
477 collapse_key: extra.map_or(None, |e| {
478 e.collapse_key.map_or(None, |e| Some(e.to_string()))
479 }),
480 priority: extra.map_or(None, |e| {
481 e.priority.as_ref().map_or(None, |e| match e {
482 Priority::High => Some("high".to_string()),
483 Priority::Normal => Some("normal".to_string()),
484 })
485 }),
486 ttl: extra.map_or(None, |e| e.ttl.map_or(None, |e| Some(format!("{}s", e)))),
487 notification: Some(fcm::AndroidNotification {
488 sound: extra
489 .map_or(None, |e| e.sound.map_or(None, |e| Some(e.to_string()))),
490 icon: extra.map_or(None, |e| e.icon.map_or(None, |e| Some(e.to_string()))),
491 image: extra.map_or(None, |e| Some(e.image.to_string())),
492 tag: extra.map_or(None, |e| e.tag.map_or(None, |e| Some(e.to_string()))),
493 body_loc_key: extra.map_or(None, |e| Some(e.body_loc_key.to_string())),
494 body_loc_args: extra.map_or(None, |e| {
495 Some(e.body_loc_args.iter().map(|e| e.to_string()).collect::<_>())
496 }),
497 title_loc_key: extra.map_or(None, |e| Some(e.title_loc_key.to_string())),
498 title_loc_args: extra.map_or(None, |e| {
499 Some(
500 e.title_loc_args
501 .iter()
502 .map(|e| e.to_string())
503 .collect::<_>(),
504 )
505 }),
506 channel_id: extra.map_or(None, |e| Some(e.channel_id.to_string())),
507 ticker: extra.map_or(None, |e| Some(e.ticker.to_string())),
508 click_action: extra.map_or(None, |e| Some(e.click_action.to_string())),
509 visibility: extra.map_or(None, |e| match e.visibility {
510 Visibility::Unspecified => Some("".to_string()),
511 Visibility::Private => Some("PRIVATE".to_string()),
512 Visibility::Public => Some("PUBLIC".to_string()),
513 Visibility::Secret => Some("SECRET".to_string()),
514 }),
515 ..Default::default()
516 }),
517 ..Default::default()
518 }),
519 ..Default::default()
520 }),
521 }
522 }
523}
524
525impl<'a> FromMessage<'a> for Vec<fcm::MulticastMessage<'a>> {}
526
527impl<'a> TryFrom<Message<'a>> for Vec<fcm::MulticastMessage<'a>> {
528 type Error = Error;
529
530 fn try_from(msg: Message<'a>) -> Result<Self, Self::Error> {
531 let tokens = msg.tokens;
532
533 let hw_msg: fcm::MulticastMessage = msg.try_into()?;
534
535 let mut msgs = Vec::new();
536
537 for i in 0..(tokens.len() / 1000) + 1 {
538 let mut fcm_tokens = Vec::new();
539
540 let mut msg = hw_msg.clone();
541
542 let start_index = i * 1000;
543 let end_index = if tokens.len() - start_index >= 1000 {
544 1000
545 } else {
546 tokens.len()
547 };
548 for j in start_index..end_index {
549 fcm_tokens.push(tokens[j]);
550 }
551 msg.tokens = fcm_tokens;
552 msgs.push(msg);
553 }
554
555 Ok(msgs)
556 }
557}
558
559#[cfg(feature = "wecom")]
560impl<'a> TryFrom<Message<'a>> for wecom::Message<'a> {
561 type Error = Error;
562
563 fn try_from(msg: Message<'a>) -> Result<Self, Self::Error> {
564 let extra = msg.wecom.as_ref();
565 match msg.body {
566 Body::Transparent(data) => wecom::MessageBuilder::default()
567 .inner(wecom::InnerMesssage::Text { content: data })
568 .build()
569 .map_err(|e| InnerError::Format(e.to_string()).into()),
570 Body::Notify { title, body } => {
571 let mut br = wecom::MessageBuilder::default();
572
573 if title.is_empty() {
574 br.inner(wecom::InnerMesssage::Text { content: body });
575 }
576
577 br.to(wecom::To::Touser(msg.tokens.join("|")));
578
579 if let Some(extra) = extra {
580 match extra {
581 WecomExtra::Markdown(mark) => {
582 if *mark {
583 br.inner(wecom::InnerMesssage::Markdown { content: body });
584 }
585 }
586 WecomExtra::Text { url, btntxt } => {
587 br.inner(wecom::InnerMesssage::Textcard {
588 title,
589 description: body,
590 url,
591 btntxt: Some(btntxt),
592 });
593 }
594 }
595 }
596 br.build()
597 .map_err(|e| InnerError::Format(e.to_string()).into())
598 }
599 }
600 }
601}
602
603#[cfg(feature = "wecom")]
604impl<'a> FromMessage<'a> for wecom::Message<'a> {}
605
606#[cfg(feature = "apns")]
607impl<'a> FromMessage<'a> for Vec<apns::Notification<'a>> {}
608
609#[cfg(feature = "apns")]
610impl<'a> TryFrom<Message<'a>> for Vec<apns::Notification<'a>> {
611 type Error = Error;
612
613 fn try_from(msg: Message<'a>) -> Result<Self, Self::Error> {
614 let extra = msg.apns.as_ref();
615
616 let mut resps = Vec::new();
617
618 for &token in msg.tokens {
619 let mut br = apns::NotificationBuilder::new(
620 extra
621 .ok_or(InnerError::Format("missing topic".to_string()))?
622 .topic,
623 token,
624 );
625
626 match msg.body {
627 Body::Transparent(data) => {
628 br.custom(HashMap::from_iter([("text", data)]));
629 }
630 Body::Notify { title, body } => {
631 br.title(title);
632 br.body(body);
633 }
634 }
635 resps.push(br.build());
636 }
637
638 Ok(resps)
639 }
640}
641
642#[cfg(feature = "email")]
643impl<'a> FromMessage<'a> for email::Message<'a> {}
644
645#[cfg(feature = "email")]
646impl<'a> TryFrom<Message<'a>> for email::Message<'a> {
647 type Error = Error;
648
649 fn try_from(msg: Message<'a>) -> Result<Self, Self::Error> {
650 match msg.body {
651 Body::Transparent(_) => {
652 Err(InnerError::UnsupportedType("Transparent".to_string()).into())
653 }
654 Body::Notify { title, body } => Ok(Self {
655 title: title,
656 body: body,
657 to: msg.tokens,
658 }),
659 }
660 }
661}
662
663#[derive(Debug, Serialize_repr, Deserialize_repr, Clone)]
664#[repr(u8)]
665pub enum Visibility {
666 Unspecified = 0,
667 Private = 1,
668 Public = 2,
669 Secret = 3,
670}
671
672#[derive(Debug, Serialize, Deserialize, Clone)]
673#[serde(rename_all = "UPPERCASE")]
674pub enum Priority {
675 High,
676 Normal,
677}
678
679#[derive(Debug, Clone)]
680pub struct AndroidExtra<'a> {
681 pub collapse_key: Option<i64>,
682 pub priority: Option<Priority>,
683 pub ttl: Option<i64>,
684 pub title: Option<&'a str>,
685 pub body: Option<&'a str>,
686 pub icon: Option<&'a str>,
687 pub color: Option<&'a str>,
688 pub sound: Option<&'a str>,
689 pub tag: Option<&'a str>,
690
691 pub click_action: &'a str,
693 pub body_loc_key: &'a str,
694 pub body_loc_args: &'a [&'a str],
695 pub title_loc_key: &'a str,
696 pub title_loc_args: &'a [&'a str],
697 pub channel_id: &'a str,
698 pub image: &'a str,
699 pub ticker: &'a str,
700 pub visibility: Visibility,
701 pub package_name: &'a str,
703 pub auto_clear: Option<i8>,
705 pub foreground_show: Option<bool>,
707 pub notify_id: Option<i32>,
709}
710
711#[derive(Debug, Serialize)]
712#[serde(rename_all = "lowercase")]
713pub enum PushType {
714 Alert,
715 Voip,
716}
717
718#[derive(Debug, Serialize)]
719pub struct IosExtra<'a> {
720 pub push_type: PushType,
721 pub topic: &'a str,
722}
723
724pub enum Client {
725 #[cfg(feature = "xiaomi")]
726 Mi(xiaomi::Client),
727 #[cfg(feature = "huawei")]
728 Huawei(huawei::Client),
729 #[cfg(feature = "fcm")]
730 Fcm(fcm::Client),
731 #[cfg(feature = "wecom")]
732 Wecom(wecom::Client),
733 #[cfg(feature = "apns")]
734 Apns(apns::Client),
735 #[cfg(feature = "email")]
736 Email(email::Client),
737 #[cfg(feature = "rtm")]
738 Rtm(rtm::Client),
739}
740
741struct Service {
742 pushers: RwLock<HashMap<String, Client>>, }
744
745impl Service {
746 pub fn new() -> Self {
748 Self {
749 pushers: Default::default(),
750 }
751 }
752
753 pub async fn remove_client(&self, ch_id: &str) {
757 let mut pushers = self.pushers.write().await;
758 (*pushers).remove(ch_id);
759 }
760
761 pub async fn register_client(&self, ch_id: &str, cli: Client) {
765 let mut pushers = self.pushers.write().await;
766 (*pushers).insert(ch_id.to_string(), cli);
767 }
768
769 pub async fn retry_batch_push(
773 &self,
774 ch_id: &str,
775 msg: Message<'_>,
776 ) -> Result<PushResults, Error> {
777 let pusher = self.pushers.read().await;
778 let pusher = (*pusher).get(ch_id).ok_or(InnerError::NoExistClient)?;
779
780 match pusher {
781 #[cfg(feature = "xiaomi")]
782 Client::Mi(client) => {
783 let token_len = msg.tokens.len();
784 let msg: Vec<xiaomi::Message> = msg.try_into()?;
785 let resp = client.retry_batch_push(msg.as_slice()).await?;
786
787 let mut res = PushResults::default();
788 for resp in resp.into_iter() {
789 res.results.push(PushResult {
790 request_id: resp.data.id,
791 code: resp.code.to_string(),
792 reason: resp.reason,
793 invalid_tokens: Vec::new(),
794 ..Default::default()
795 });
796 }
797 res.success = token_len as i64;
798 Ok(res)
799 }
800 #[cfg(feature = "huawei")]
801 Client::Huawei(client) => {
802 let msg: Vec<huawei::Message> = msg.try_into()?;
803 let resp = client.retry_batch_push(msg.as_slice()).await?;
804 let mut res = PushResults::default();
805 for resp in resp.into_iter() {
806 res.results.push(PushResult {
807 request_id: resp.request_id,
808 code: serde_json::to_string(&resp.code).unwrap(),
809 reason: resp.msg,
810 success: resp.success,
811 failure: resp.failure,
812 invalid_tokens: Vec::new(),
813 });
814 res.success += resp.success;
815 res.failure += resp.failure;
816 }
817 Ok(res)
818 }
819 #[cfg(feature = "fcm")]
820 Client::Fcm(client) => {
821 let msg: Vec<fcm::MulticastMessage> = msg.try_into()?;
822 let resp = client.retry_batch_push(msg.as_slice()).await?;
823
824 let mut res = PushResults::default();
825 for resp in resp.into_iter() {
826 for respons in resp.responses {
827 match respons {
828 fcm::SendResponse::Ok { message_id } => {
829 res.results.push(PushResult {
830 request_id: message_id,
831 ..Default::default()
832 });
833 }
834 fcm::SendResponse::Error { error, token } => {
835 res.results.push(PushResult {
836 invalid_tokens: vec![token],
837 reason: error,
838 ..Default::default()
839 });
840 }
841 }
842 }
843 res.success += resp.success_count;
844 res.failure += resp.failure_count;
845 }
846 Ok(res)
847 }
848 #[cfg(feature = "wecom")]
849 Client::Wecom(client) => {
850 let token_len = msg.tokens.len();
851 let msg: wecom::Message = msg.try_into()?;
852
853 let resp = client.retry_push(&msg).await?;
854
855 let invalid_tokens = resp.invaliduser.unwrap_or_default();
856 let invalid_tokens = if !invalid_tokens.is_empty() {
857 invalid_tokens
858 .split("|")
859 .map(|e| e.to_string())
860 .collect::<Vec<String>>()
861 } else {
862 vec![]
863 };
864
865 let res = PushResult {
866 request_id: resp.msgid,
867 code: resp.errcode.to_string(),
868 reason: resp.errmsg,
869 success: (token_len - invalid_tokens.len()) as i64,
870 failure: invalid_tokens.len() as i64,
871 invalid_tokens,
872 };
873
874 Ok(PushResults {
875 success: res.success,
876 failure: res.failure,
877 results: vec![res],
878 })
879 }
880 #[cfg(feature = "apns")]
881 Client::Apns(client) => {
882 let a: Vec<apns::Notification> = msg.try_into()?;
883 let resps = client.retry_batch_push(a.as_slice()).await?;
884 let mut res = PushResults::default();
885 for resp in resps {
886 if resp.status_code.is_success() {
887 res.results.push(PushResult {
888 request_id: resp.apns_id,
889 code: resp.status_code.to_string(),
890 reason: serde_json::to_string(&resp.reason).unwrap(),
891 success: 1,
892 failure: 0,
893 invalid_tokens: vec![],
894 });
895 res.success += 1;
896 } else {
897 res.results.push(PushResult {
898 request_id: resp.apns_id,
899 code: resp.status_code.to_string(),
900 reason: serde_json::to_string(&resp.reason).unwrap(),
901 success: 0,
902 failure: 1,
903 invalid_tokens: vec![resp.token],
904 });
905 res.failure += 1;
906 }
907 }
908 Ok(res)
909 }
910 #[cfg(feature = "email")]
911 Client::Email(client) => {
912 let mut res = PushResults::default();
913 let msg = msg.try_into()?;
914 let resp = client.retry_push(&msg).await?;
915 for resp in resp.results {
916 if resp.success {
917 res.success += 1;
918 res.results.push(PushResult {
919 success: 1,
920 ..Default::default()
921 });
922 } else {
923 res.failure += 1;
924 res.results.push(PushResult {
925 failure: 1,
926 invalid_tokens: vec![resp.email.to_string()],
927 reason: resp.reason.unwrap_or_default(),
928 ..Default::default()
929 });
930 }
931 }
932 Ok(res)
933 }
934 #[cfg(feature = "rtm")]
935 Client::Rtm(client) => {
936 let mut res = PushResults::default();
937 Ok(res)
938 }
939 }
940 }
941}
942
943#[cfg(test)]
944mod tests {
945 use flatten_json_object::{ArrayFormatting, Flattener};
946
947 use crate::{apns, fcm, huawei, xiaomi, Client, Service};
948
949 #[cfg(feature = "wecom")]
950 use crate::wecom;
951
952 #[tokio::test]
953 async fn test_services() {
954 let client_id = std::env::var("MI_CLIENT_ID").unwrap();
955 let client_secret = std::env::var("MI_CLIENT_SECRET").unwrap();
956 let project_id = std::env::var("MI_PROJECT_ID").unwrap();
957
958 let mi = xiaomi::Client::new(&xiaomi::Config {
959 client_id: &client_id,
960 client_secret: &client_secret,
961 project_id: &project_id,
962 })
963 .unwrap();
964
965 let client_id = std::env::var("WECOM_CLIENT_ID").unwrap();
966 let client_secret = std::env::var("WECOM_CLIENT_SECRET").unwrap();
967 let agent_id = std::env::var("WECOM_AGENT_ID")
968 .unwrap()
969 .parse::<i64>()
970 .expect("");
971
972 #[cfg(feature = "wecom")]
973 let wecom_ = wecom::Client::new(&client_id, &client_secret, agent_id)
974 .await
975 .unwrap();
976
977 let client_id = std::env::var("GOOGLE_CLIENT_ID").unwrap();
978 let client_email = std::env::var("GOOGLE_CLIENT_EMAIL").unwrap();
979 let private_id = std::env::var("GOOGLE_PRIVATE_ID").unwrap();
980 let private_key = std::env::var("GOOGLE_PRIVATE_KEY").unwrap();
981 let project_id = std::env::var("GOOGLE_PROJECT_ID").unwrap();
982
983 let mut fcm_ = fcm::Client::new(fcm::Config {
984 key_type: "service_account".to_string().into(),
985 client_id: client_id.into(),
986 private_key_id: private_id.into(),
987 private_key,
988 token_uri: "https://oauth2.googleapis.com/token".to_string(),
989 auth_uri: "https://accounts.google.com/o/oauth2/auth".to_string().into(),
990 project_id: project_id.into(),
991 client_email,
992 auth_provider_x509_cert_url: Some("https://www.googleapis.com/oauth2/v1/certs".to_string()),
993 client_x509_cert_url: Some("https://www.googleapis.com/robot/v1/metadata/x509/firebase-adminsdk-vle32%40avcf-7ea7e.iam.gserviceaccount.com".to_string()),
994 })
995 .await.unwrap();
996
997 fcm_.with_proxy(fcm::ProxyConfig {
998 addr: "socks5://127.0.0.1:7890",
999 user: None,
1000 pass: None,
1001 })
1002 .await;
1003
1004 let apns_ = apns::Client::new(b"", "pass").unwrap();
1005
1006 let hw = huawei::Client::new("", "").await.unwrap();
1007
1008 let svc = Service::new();
1009
1010 svc.register_client("mi", Client::Mi(mi)).await;
1011
1012 #[cfg(feature = "wecom")]
1013 svc.register_client("wecom", Client::Wecom(wecom_)).await;
1014
1015 svc.register_client("fcm", Client::Fcm(fcm_)).await;
1016 svc.register_client("apns", Client::Apns(apns_)).await;
1017 svc.register_client("hw", Client::Huawei(hw)).await;
1018 }
1019
1020 #[tokio::test]
1021 async fn message_to_message() {
1022 let str = r#"
1025 {
1026 "cmd":"cmd",
1027 "ts":1234,
1028 "opt":{
1029 "reqTs":123,
1030 "inviter":"nekilc"
1031 }
1032 }
1033 "#;
1034
1035 let val: serde_json::Value = serde_json::from_str(str).unwrap();
1036
1037 let res = Flattener::new()
1038 .set_key_separator(".")
1039 .set_array_formatting(ArrayFormatting::Plain)
1040 .set_preserve_empty_arrays(false)
1041 .set_preserve_empty_objects(false)
1042 .flatten(&val)
1043 .unwrap();
1044 let res = serde_json::to_string_pretty(&res).unwrap();
1045 println!("{}", res);
1046
1047 for group in ["1", "2", "3", "4", "5"].rsplitn(2, |e| false) {
1048 println!("{:?}", group);
1049 }
1050 }
1051
1052 #[test]
1053 #[cfg(feature = "huawei")]
1054 fn test_huawei_msg() {
1055 use crate::{huawei, Body, Message};
1056 let mut tokens = Vec::new();
1057 for i in 0..1100 {
1058 tokens.push(i.to_string());
1059 }
1060 let mut tokens_ = Vec::new();
1061 for i in 0..tokens.len() {
1062 tokens_.push(tokens[i].as_str());
1063 }
1064 let msg = Message {
1065 tokens: tokens_.as_slice(),
1066 body: Body::Transparent(""),
1067 android: None,
1068 apns: None,
1069 wecom: None,
1070 };
1071 let hw_msg: huawei::Message = msg.clone().try_into().unwrap();
1072 assert_eq!(hw_msg.message.token.len(), 1100);
1073
1074 let hw_msgs: Vec<huawei::Message> = msg.try_into().unwrap();
1075 assert_eq!(hw_msgs.len(), 2);
1076 assert_eq!(hw_msgs[0].message.token.len(), 1000);
1077 assert_eq!(hw_msgs[1].message.token.len(), 100);
1078 }
1079
1080 #[test]
1081 #[cfg(feature = "fcm")]
1082 fn test_fcm_msg() {
1083 use crate::{fcm, Body, Message};
1084 let mut tokens = Vec::new();
1085 for i in 0..1100 {
1086 tokens.push(i.to_string());
1087 }
1088 let mut tokens_ = Vec::new();
1089 for i in 0..tokens.len() {
1090 tokens_.push(tokens[i].as_str());
1091 }
1092 let msg = Message {
1093 tokens: tokens_.as_slice(),
1094 body: Body::Transparent(""),
1095 android: None,
1096 apns: None,
1097 wecom: None,
1098 };
1099 let hw_msg: fcm::MulticastMessage = msg.clone().try_into().unwrap();
1100 assert_eq!(hw_msg.tokens.len(), 1100);
1101
1102 let hw_msgs: Vec<fcm::MulticastMessage> = msg.try_into().unwrap();
1103 assert_eq!(hw_msgs.len(), 2);
1104 assert_eq!(hw_msgs[0].tokens.len(), 1000);
1105 assert_eq!(hw_msgs[1].tokens.len(), 100);
1106 }
1107}