hi_push/
lib.rs

1use ::http::StatusCode;
2use std::{collections::HashMap, error::Error as StdError};
3use thiserror::Error;
4
5use async_trait::async_trait;
6use flatten_json_object::{ArrayFormatting, Flattener};
7use serde::{Deserialize, Serialize};
8use serde_repr::{Deserialize_repr, Serialize_repr};
9use tokio::sync::RwLock;
10
11#[cfg(all(feature = "mysql", feature = "mongo"))]
12compile_error!("mysql and mongo not enable both.");
13
14mod utils;
15
16#[cfg(feature = "apns")]
17pub mod apns;
18#[cfg(feature = "email")]
19pub mod email;
20#[cfg(feature = "fcm")]
21pub mod fcm;
22#[cfg(feature = "huawei")]
23pub mod huawei;
24#[cfg(feature = "rtm")]
25pub mod rtm;
26#[cfg(feature = "wecom")]
27pub mod wecom;
28#[cfg(feature = "xiaomi")]
29pub mod xiaomi;
30
31#[cfg(feature = "grpc")]
32pub mod grpc;
33
34#[cfg(feature = "http")]
35pub mod http;
36
37#[cfg(all(any(feature = "http", feature = "grpc")))]
38pub mod service;
39
40pub(crate) type BoxError = Box<dyn StdError + Send + Sync>;
41
42pub enum Transparent<'a> {
43    Text(&'a str),
44    Json(serde_json::Map<String, serde_json::Value>),
45}
46
47#[derive(Debug, Clone, Copy)]
48pub enum Body<'a> {
49    Transparent(&'a str),
50    Notify { title: &'a str, body: &'a str },
51}
52
53#[derive(Debug, Clone)]
54pub struct ApnsExtra<'a> {
55    pub topic: &'a str,
56    pub push_type: apns::ApnsPushType,
57}
58
59#[derive(Debug, Clone)]
60pub enum WecomExtra<'a> {
61    Markdown(bool),
62    Text { url: &'a str, btntxt: &'a str },
63}
64
65#[derive(Debug, Default, Serialize)]
66pub struct PushResult {
67    pub request_id: String,
68    pub code: String,
69    pub reason: String,
70    pub success: i64,
71    pub failure: i64,
72    pub invalid_tokens: Vec<String>,
73}
74
75#[derive(Debug, Default, Serialize)]
76pub struct PushResults {
77    pub success: i64,
78    pub failure: i64,
79    pub results: Vec<PushResult>,
80}
81
82#[derive(Debug, Clone)]
83pub struct Message<'a> {
84    pub tokens: &'a [&'a str],
85    pub body: Body<'a>,
86    pub android: Option<AndroidExtra<'a>>,
87    pub apns: Option<ApnsExtra<'a>>,
88    pub wecom: Option<WecomExtra<'a>>,
89}
90
91#[derive(Debug, Error)]
92#[error(transparent)]
93pub struct Error {
94    inner: InnerError,
95}
96
97#[derive(Debug, Error)]
98pub enum InnerError {
99    #[error("new client error: `{0}`")]
100    Client(String),
101    #[error("http error: `{0}`")]
102    Http(String),
103    #[error("format error: `{0}`")]
104    Format(String),
105    #[error("response error: `{0}`")]
106    Response(String),
107    #[error("biz error: `{0}`")]
108    Biz(String),
109    #[error("retryable error : `{0}`")]
110    RetryError(RetryError),
111    #[error("missing required parameter: `{0}`")]
112    MissingRequired(String),
113    #[error("invalid parameter: `{0}`")]
114    InvalidParams(String),
115    #[error("no exist client")]
116    NoExistClient,
117    #[error("serde error: `{0}`")]
118    Serde(BoxError),
119    #[error("message limit")]
120    MessageLimit,
121    #[error("token limit")]
122    TokenLimit,
123    #[error("unknown error: `{0}`")]
124    Unknown(String),
125    #[error("auth error: `{0}`")]
126    Auth(String),
127    #[error("UnsupportedType error: `{0}`")]
128    UnsupportedType(String),
129}
130
131impl From<serde_json::Error> for Error {
132    fn from(e: serde_json::Error) -> Self {
133        Self {
134            inner: InnerError::Serde(Box::new(e)),
135        }
136    }
137}
138
139impl From<reqwest::Error> for Error {
140    fn from(e: reqwest::Error) -> Self {
141        let inner = if e.is_builder() {
142            InnerError::Client(e.to_string())
143        } else if e.is_status() {
144            match e.status().unwrap() {
145                StatusCode::BAD_REQUEST => InnerError::MissingRequired(e.to_string()),
146                StatusCode::SERVICE_UNAVAILABLE => {
147                    InnerError::RetryError(RetryError::Server(e.to_string()))
148                }
149                StatusCode::GATEWAY_TIMEOUT => {
150                    InnerError::RetryError(RetryError::Timeout(e.to_string()))
151                }
152                StatusCode::REQUEST_TIMEOUT => {
153                    InnerError::RetryError(RetryError::Timeout(e.to_string()))
154                }
155                StatusCode::UNAUTHORIZED => InnerError::RetryError(RetryError::Auth(e.to_string())),
156                _ => InnerError::Unknown(e.to_string()),
157            }
158        } else if e.is_timeout() {
159            InnerError::RetryError(RetryError::Timeout(e.to_string()))
160        } else if e.is_body() {
161            InnerError::Format(e.to_string())
162        } else if e.is_decode() {
163            InnerError::Serde(Box::new(e))
164        } else {
165            InnerError::Unknown(e.to_string())
166        };
167        Self { inner }
168    }
169}
170
171#[derive(Debug, Eq, PartialEq, Error)]
172pub enum RetryError {
173    #[error("auth error: `{0}`")]
174    Auth(String),
175    #[error("server error: `{0}`")]
176    Server(String),
177    #[error("timeout error: `{0}`")]
178    Timeout(String),
179    #[error("qps limiter")]
180    QPS,
181}
182
183impl From<RetryError> for Error {
184    fn from(e: RetryError) -> Self {
185        Self {
186            inner: InnerError::RetryError(e),
187        }
188    }
189}
190
191impl From<InnerError> for Error {
192    fn from(e: InnerError) -> Self {
193        Self { inner: e }
194    }
195}
196
197#[derive(Debug)]
198pub enum ErrorKind {
199    Http,
200    Format,
201    Auth,
202    Response,
203    Biz,
204}
205
206// Pusher
207// Message pushing abstract.
208// limit:
209// - max token number
210// - retryable
211// - qps
212// - body
213// - msg field
214#[async_trait]
215pub trait Pusher<'b, M, R>
216where
217    M: Sync,
218    R: Send,
219{
220    const TOKEN_LIMIT: usize = 500;
221
222    async fn push(&self, msg: &'b M) -> Result<R, Error>;
223
224    async fn retry_push(&self, msg: &'b M) -> Result<R, Error> {
225        tokio_retry::RetryIf::spawn(
226            tokio_retry::strategy::ExponentialBackoff::from_millis(2)
227                .factor(1000)
228                .take(3),
229            || self.push(msg),
230            |e: &Error| match e.inner {
231                InnerError::RetryError(_) => true,
232                _ => false,
233            },
234        )
235        .await
236    }
237
238    async fn retry_batch_push(&self, msgs: &'b [M]) -> Result<Vec<R>, Error> {
239        let mut resps = Vec::new();
240        for msg in msgs {
241            let resp = self.retry_push(msg).await?;
242            resps.push(resp);
243        }
244        Ok(resps)
245    }
246}
247
248pub trait FromMessage<'a>: TryFrom<Message<'a>> {}
249
250impl<'a> TryFrom<Message<'a>> for xiaomi::Message<'a> {
251    type Error = Error;
252
253    fn try_from(msg: Message<'a>) -> Result<Self, Self::Error> {
254        let extra = msg.android.as_ref();
255        Ok(match msg.body {
256            Body::Transparent(data) => xiaomi::Message {
257                payload: Some(data),
258                pass_through: xiaomi::Passtrough::Transparent,
259                registration_id: Some(msg.tokens.join(",")),
260                ..Default::default()
261            },
262            Body::Notify { title, body, .. } => xiaomi::Message {
263                title,
264                description: body,
265                registration_id: Some(msg.tokens.join(",")),
266                pass_through: xiaomi::Passtrough::Notice,
267                restricted_package_name: extra.map_or("", |v| v.package_name),
268                extra: xiaomi::Extra {
269                    notify_foreground: extra.map_or(None, |v| v.foreground_show.map(|v| v.into())),
270                    notify_effect: extra.map_or(None, |v| {
271                        if !v.click_action.is_empty() {
272                            Some(xiaomi::NotifyEffect::Intent)
273                        } else {
274                            None
275                        }
276                    }),
277                    intent_uri: extra.map_or(None, |v| {
278                        if !v.click_action.is_empty() {
279                            Some(v.click_action)
280                        } else {
281                            None
282                        }
283                    }),
284                    job_key: extra.map_or(None, |v| {
285                        if !v.click_action.is_empty() {
286                            Some(v.click_action)
287                        } else {
288                            None
289                        }
290                    }),
291                    ..Default::default()
292                }
293                .into(),
294                ..Default::default()
295            },
296        })
297    }
298}
299
300impl<'a> FromMessage<'a> for xiaomi::Message<'a> {}
301
302impl<'a> FromMessage<'a> for Vec<xiaomi::Message<'a>> {}
303
304impl<'a> TryFrom<Message<'a>> for Vec<xiaomi::Message<'a>> {
305    type Error = Error;
306
307    fn try_from(msg: Message<'a>) -> Result<Self, Self::Error> {
308        let tokens = msg.tokens;
309
310        let mi_msg: xiaomi::Message = msg.try_into()?;
311
312        let mut msgs = Vec::new();
313        for i in 0..(tokens.len() / 1000) {
314            let mut mi_tokens = Vec::new();
315
316            let mut msg = mi_msg.clone();
317            let start_index = i * 1000;
318            let end_index = if tokens.len() - start_index >= 1000 {
319                1000
320            } else {
321                tokens.len()
322            };
323            for j in start_index..end_index {
324                mi_tokens.push(tokens[i * 1000 + j]);
325            }
326            msg.registration_id = Some(mi_tokens.join(","));
327            msgs.push(msg);
328        }
329
330        Ok(msgs)
331    }
332}
333
334impl<'a> FromMessage<'a> for huawei::Message<'a> {}
335
336impl<'a> TryFrom<Message<'a>> for huawei::Message<'a> {
337    type Error = Error;
338
339    fn try_from(msg: Message<'a>) -> Result<Self, Self::Error> {
340        let extra = msg.android.as_ref();
341
342        match msg.body {
343            Body::Transparent(data) => Ok(huawei::Message {
344                validate_only: false,
345                message: huawei::InnerMessage {
346                    data: Some(data),
347                    android: Some(huawei::AndroidConfig {
348                        ..Default::default()
349                    }),
350                    token: msg.tokens.to_vec(),
351                    ..Default::default()
352                },
353            }),
354            Body::Notify { title, body } => Ok(huawei::Message {
355                validate_only: false,
356                message: huawei::InnerMessage {
357                    token: msg.tokens.to_vec(),
358                    notification: Some(huawei::Notification {
359                        title,
360                        body,
361                        ..Default::default()
362                    }),
363                    android: Some(huawei::AndroidConfig {
364                        collapse_key: extra.map_or(None, |e| e.collapse_key),
365                        urgency: extra.map_or(None, |e| {
366                            e.priority.as_ref().map_or(None, |e| match e {
367                                Priority::High => Some(huawei::Urgency::High),
368                                Priority::Normal => Some(huawei::Urgency::Normal),
369                            })
370                        }),
371                        // ttl: extra.map_or(None, |e| e.ttl.map_or(None, |e| Some(e))),
372                        notification: Some(huawei::AndroidNotification {
373                            sound: extra.map_or(None, |e| e.sound),
374                            icon: extra.map_or(None, |e| e.icon),
375                            image: extra.map_or(None, |e| Some(e.image)),
376                            tag: extra.map_or(None, |e| e.tag),
377                            body_loc_key: extra.map_or(None, |e| Some(e.body_loc_key)),
378                            body_loc_args: extra.map_or(Default::default(), |e| e.body_loc_args),
379                            title_loc_key: extra
380                                .map_or(Default::default(), |e| Some(e.title_loc_key)),
381                            title_loc_args: extra.map_or(Default::default(), |e| e.title_loc_args),
382                            channel_id: extra.map_or(None, |e| Some(e.channel_id)),
383                            ticker: extra.map_or(None, |e| Some(e.ticker)),
384                            click_action: extra
385                                .map(|e| huawei::ClickAction::new_intent(e.click_action))
386                                .ok_or(InnerError::MissingRequired(
387                                    "missing click action".to_string(),
388                                ))?,
389                            visibility: extra.map_or(None, |e| match e.visibility {
390                                Visibility::Unspecified => Some(""),
391                                Visibility::Private => Some("PRIVATE"),
392                                Visibility::Public => Some("PUBLIC"),
393                                Visibility::Secret => Some("SECRET"),
394                            }),
395                            auto_clear: extra.map_or(None, |e| e.auto_clear),
396                            foreground_show: extra.map_or(None, |e| e.foreground_show),
397                            ..Default::default()
398                        }),
399                        ..Default::default()
400                    }),
401                    data: None,
402                },
403            }),
404        }
405    }
406}
407
408impl<'a> FromMessage<'a> for Vec<huawei::Message<'a>> {}
409
410impl<'a> TryFrom<Message<'a>> for Vec<huawei::Message<'a>> {
411    type Error = Error;
412
413    fn try_from(msg: Message<'a>) -> Result<Self, Self::Error> {
414        let tokens = msg.tokens;
415
416        let hw_msg: huawei::Message = msg.try_into()?;
417
418        let mut msgs = Vec::new();
419
420        for i in 0..(tokens.len() / 1000) + 1 {
421            let mut hw_tokens = Vec::new();
422
423            let mut msg = hw_msg.clone();
424
425            let start_index = i * 1000;
426            let end_index = if tokens.len() - start_index >= 1000 {
427                1000
428            } else {
429                tokens.len()
430            };
431            for j in start_index..end_index {
432                hw_tokens.push(tokens[j]);
433            }
434            msg.message.token = hw_tokens;
435            msgs.push(msg);
436        }
437
438        Ok(msgs)
439    }
440}
441
442impl<'a> FromMessage<'a> for fcm::MulticastMessage<'a> {}
443
444impl<'a> TryFrom<Message<'a>> for fcm::MulticastMessage<'a> {
445    type Error = Error;
446
447    fn try_from(msg: Message<'a>) -> Result<Self, Self::Error> {
448        let extra = msg.android.as_ref();
449
450        match msg.body {
451            Body::Transparent(data) => {
452                let data = if let Ok(val) = serde_json::from_str::<serde_json::Value>(data) {
453                    let val = Flattener::new()
454                        .set_key_separator(".")
455                        .set_array_formatting(ArrayFormatting::Plain)
456                        .set_preserve_empty_arrays(false)
457                        .set_preserve_empty_objects(false)
458                        .flatten(&val)
459                        .unwrap();
460                    serde_json::from_value::<HashMap<String, String>>(val)?
461                } else {
462                    HashMap::from_iter([("text".to_string(), data.to_string())])
463                };
464                Ok(fcm::MulticastMessage {
465                    data: Some(data),
466                    tokens: msg.tokens.to_vec(),
467                    ..Default::default()
468                })
469            }
470            Body::Notify { title, body } => Ok(fcm::MulticastMessage {
471                notification: Some(fcm::Notification {
472                    body: Some(body.to_string()),
473                    title: Some(title.to_string()),
474                    ..Default::default()
475                }),
476                android: Some(fcm::AndroidConfig {
477                    collapse_key: extra.map_or(None, |e| {
478                        e.collapse_key.map_or(None, |e| Some(e.to_string()))
479                    }),
480                    priority: extra.map_or(None, |e| {
481                        e.priority.as_ref().map_or(None, |e| match e {
482                            Priority::High => Some("high".to_string()),
483                            Priority::Normal => Some("normal".to_string()),
484                        })
485                    }),
486                    ttl: extra.map_or(None, |e| e.ttl.map_or(None, |e| Some(format!("{}s", e)))),
487                    notification: Some(fcm::AndroidNotification {
488                        sound: extra
489                            .map_or(None, |e| e.sound.map_or(None, |e| Some(e.to_string()))),
490                        icon: extra.map_or(None, |e| e.icon.map_or(None, |e| Some(e.to_string()))),
491                        image: extra.map_or(None, |e| Some(e.image.to_string())),
492                        tag: extra.map_or(None, |e| e.tag.map_or(None, |e| Some(e.to_string()))),
493                        body_loc_key: extra.map_or(None, |e| Some(e.body_loc_key.to_string())),
494                        body_loc_args: extra.map_or(None, |e| {
495                            Some(e.body_loc_args.iter().map(|e| e.to_string()).collect::<_>())
496                        }),
497                        title_loc_key: extra.map_or(None, |e| Some(e.title_loc_key.to_string())),
498                        title_loc_args: extra.map_or(None, |e| {
499                            Some(
500                                e.title_loc_args
501                                    .iter()
502                                    .map(|e| e.to_string())
503                                    .collect::<_>(),
504                            )
505                        }),
506                        channel_id: extra.map_or(None, |e| Some(e.channel_id.to_string())),
507                        ticker: extra.map_or(None, |e| Some(e.ticker.to_string())),
508                        click_action: extra.map_or(None, |e| Some(e.click_action.to_string())),
509                        visibility: extra.map_or(None, |e| match e.visibility {
510                            Visibility::Unspecified => Some("".to_string()),
511                            Visibility::Private => Some("PRIVATE".to_string()),
512                            Visibility::Public => Some("PUBLIC".to_string()),
513                            Visibility::Secret => Some("SECRET".to_string()),
514                        }),
515                        ..Default::default()
516                    }),
517                    ..Default::default()
518                }),
519                ..Default::default()
520            }),
521        }
522    }
523}
524
525impl<'a> FromMessage<'a> for Vec<fcm::MulticastMessage<'a>> {}
526
527impl<'a> TryFrom<Message<'a>> for Vec<fcm::MulticastMessage<'a>> {
528    type Error = Error;
529
530    fn try_from(msg: Message<'a>) -> Result<Self, Self::Error> {
531        let tokens = msg.tokens;
532
533        let hw_msg: fcm::MulticastMessage = msg.try_into()?;
534
535        let mut msgs = Vec::new();
536
537        for i in 0..(tokens.len() / 1000) + 1 {
538            let mut fcm_tokens = Vec::new();
539
540            let mut msg = hw_msg.clone();
541
542            let start_index = i * 1000;
543            let end_index = if tokens.len() - start_index >= 1000 {
544                1000
545            } else {
546                tokens.len()
547            };
548            for j in start_index..end_index {
549                fcm_tokens.push(tokens[j]);
550            }
551            msg.tokens = fcm_tokens;
552            msgs.push(msg);
553        }
554
555        Ok(msgs)
556    }
557}
558
559#[cfg(feature = "wecom")]
560impl<'a> TryFrom<Message<'a>> for wecom::Message<'a> {
561    type Error = Error;
562
563    fn try_from(msg: Message<'a>) -> Result<Self, Self::Error> {
564        let extra = msg.wecom.as_ref();
565        match msg.body {
566            Body::Transparent(data) => wecom::MessageBuilder::default()
567                .inner(wecom::InnerMesssage::Text { content: data })
568                .build()
569                .map_err(|e| InnerError::Format(e.to_string()).into()),
570            Body::Notify { title, body } => {
571                let mut br = wecom::MessageBuilder::default();
572
573                if title.is_empty() {
574                    br.inner(wecom::InnerMesssage::Text { content: body });
575                }
576
577                br.to(wecom::To::Touser(msg.tokens.join("|")));
578
579                if let Some(extra) = extra {
580                    match extra {
581                        WecomExtra::Markdown(mark) => {
582                            if *mark {
583                                br.inner(wecom::InnerMesssage::Markdown { content: body });
584                            }
585                        }
586                        WecomExtra::Text { url, btntxt } => {
587                            br.inner(wecom::InnerMesssage::Textcard {
588                                title,
589                                description: body,
590                                url,
591                                btntxt: Some(btntxt),
592                            });
593                        }
594                    }
595                }
596                br.build()
597                    .map_err(|e| InnerError::Format(e.to_string()).into())
598            }
599        }
600    }
601}
602
603#[cfg(feature = "wecom")]
604impl<'a> FromMessage<'a> for wecom::Message<'a> {}
605
606#[cfg(feature = "apns")]
607impl<'a> FromMessage<'a> for Vec<apns::Notification<'a>> {}
608
609#[cfg(feature = "apns")]
610impl<'a> TryFrom<Message<'a>> for Vec<apns::Notification<'a>> {
611    type Error = Error;
612
613    fn try_from(msg: Message<'a>) -> Result<Self, Self::Error> {
614        let extra = msg.apns.as_ref();
615
616        let mut resps = Vec::new();
617
618        for &token in msg.tokens {
619            let mut br = apns::NotificationBuilder::new(
620                extra
621                    .ok_or(InnerError::Format("missing topic".to_string()))?
622                    .topic,
623                token,
624            );
625
626            match msg.body {
627                Body::Transparent(data) => {
628                    br.custom(HashMap::from_iter([("text", data)]));
629                }
630                Body::Notify { title, body } => {
631                    br.title(title);
632                    br.body(body);
633                }
634            }
635            resps.push(br.build());
636        }
637
638        Ok(resps)
639    }
640}
641
642#[cfg(feature = "email")]
643impl<'a> FromMessage<'a> for email::Message<'a> {}
644
645#[cfg(feature = "email")]
646impl<'a> TryFrom<Message<'a>> for email::Message<'a> {
647    type Error = Error;
648
649    fn try_from(msg: Message<'a>) -> Result<Self, Self::Error> {
650        match msg.body {
651            Body::Transparent(_) => {
652                Err(InnerError::UnsupportedType("Transparent".to_string()).into())
653            }
654            Body::Notify { title, body } => Ok(Self {
655                title: title,
656                body: body,
657                to: msg.tokens,
658            }),
659        }
660    }
661}
662
663#[derive(Debug, Serialize_repr, Deserialize_repr, Clone)]
664#[repr(u8)]
665pub enum Visibility {
666    Unspecified = 0,
667    Private = 1,
668    Public = 2,
669    Secret = 3,
670}
671
672#[derive(Debug, Serialize, Deserialize, Clone)]
673#[serde(rename_all = "UPPERCASE")]
674pub enum Priority {
675    High,
676    Normal,
677}
678
679#[derive(Debug, Clone)]
680pub struct AndroidExtra<'a> {
681    pub collapse_key: Option<i64>,
682    pub priority: Option<Priority>,
683    pub ttl: Option<i64>,
684    pub title: Option<&'a str>,
685    pub body: Option<&'a str>,
686    pub icon: Option<&'a str>,
687    pub color: Option<&'a str>,
688    pub sound: Option<&'a str>,
689    pub tag: Option<&'a str>,
690
691    // huawei required
692    pub click_action: &'a str,
693    pub body_loc_key: &'a str,
694    pub body_loc_args: &'a [&'a str],
695    pub title_loc_key: &'a str,
696    pub title_loc_args: &'a [&'a str],
697    pub channel_id: &'a str,
698    pub image: &'a str,
699    pub ticker: &'a str,
700    pub visibility: Visibility,
701    // xiaomi required
702    pub package_name: &'a str,
703    // huawei
704    pub auto_clear: Option<i8>,
705    // huawei and xiaomi
706    pub foreground_show: Option<bool>,
707    // xiaomi
708    pub notify_id: Option<i32>,
709}
710
711#[derive(Debug, Serialize)]
712#[serde(rename_all = "lowercase")]
713pub enum PushType {
714    Alert,
715    Voip,
716}
717
718#[derive(Debug, Serialize)]
719pub struct IosExtra<'a> {
720    pub push_type: PushType,
721    pub topic: &'a str,
722}
723
724pub enum Client {
725    #[cfg(feature = "xiaomi")]
726    Mi(xiaomi::Client),
727    #[cfg(feature = "huawei")]
728    Huawei(huawei::Client),
729    #[cfg(feature = "fcm")]
730    Fcm(fcm::Client),
731    #[cfg(feature = "wecom")]
732    Wecom(wecom::Client),
733    #[cfg(feature = "apns")]
734    Apns(apns::Client),
735    #[cfg(feature = "email")]
736    Email(email::Client),
737    #[cfg(feature = "rtm")]
738    Rtm(rtm::Client),
739}
740
741struct Service {
742    pushers: RwLock<HashMap<String, Client>>, // ch_id <=> client
743}
744
745impl Service {
746    // fn new(client_id: &'a str, client_secret: &'a str) -> Self {
747    pub fn new() -> Self {
748        Self {
749            pushers: Default::default(),
750        }
751    }
752
753    /*
754        Remove client from service.
755    */
756    pub async fn remove_client(&self, ch_id: &str) {
757        let mut pushers = self.pushers.write().await;
758        (*pushers).remove(ch_id);
759    }
760
761    /*
762        Add client to service. Repeated.
763    */
764    pub async fn register_client(&self, ch_id: &str, cli: Client) {
765        let mut pushers = self.pushers.write().await;
766        (*pushers).insert(ch_id.to_string(), cli);
767    }
768
769    /*
770        Push message to third server by client_id.
771    */
772    pub async fn retry_batch_push(
773        &self,
774        ch_id: &str,
775        msg: Message<'_>,
776    ) -> Result<PushResults, Error> {
777        let pusher = self.pushers.read().await;
778        let pusher = (*pusher).get(ch_id).ok_or(InnerError::NoExistClient)?;
779
780        match pusher {
781            #[cfg(feature = "xiaomi")]
782            Client::Mi(client) => {
783                let token_len = msg.tokens.len();
784                let msg: Vec<xiaomi::Message> = msg.try_into()?;
785                let resp = client.retry_batch_push(msg.as_slice()).await?;
786
787                let mut res = PushResults::default();
788                for resp in resp.into_iter() {
789                    res.results.push(PushResult {
790                        request_id: resp.data.id,
791                        code: resp.code.to_string(),
792                        reason: resp.reason,
793                        invalid_tokens: Vec::new(),
794                        ..Default::default()
795                    });
796                }
797                res.success = token_len as i64;
798                Ok(res)
799            }
800            #[cfg(feature = "huawei")]
801            Client::Huawei(client) => {
802                let msg: Vec<huawei::Message> = msg.try_into()?;
803                let resp = client.retry_batch_push(msg.as_slice()).await?;
804                let mut res = PushResults::default();
805                for resp in resp.into_iter() {
806                    res.results.push(PushResult {
807                        request_id: resp.request_id,
808                        code: serde_json::to_string(&resp.code).unwrap(),
809                        reason: resp.msg,
810                        success: resp.success,
811                        failure: resp.failure,
812                        invalid_tokens: Vec::new(),
813                    });
814                    res.success += resp.success;
815                    res.failure += resp.failure;
816                }
817                Ok(res)
818            }
819            #[cfg(feature = "fcm")]
820            Client::Fcm(client) => {
821                let msg: Vec<fcm::MulticastMessage> = msg.try_into()?;
822                let resp = client.retry_batch_push(msg.as_slice()).await?;
823
824                let mut res = PushResults::default();
825                for resp in resp.into_iter() {
826                    for respons in resp.responses {
827                        match respons {
828                            fcm::SendResponse::Ok { message_id } => {
829                                res.results.push(PushResult {
830                                    request_id: message_id,
831                                    ..Default::default()
832                                });
833                            }
834                            fcm::SendResponse::Error { error, token } => {
835                                res.results.push(PushResult {
836                                    invalid_tokens: vec![token],
837                                    reason: error,
838                                    ..Default::default()
839                                });
840                            }
841                        }
842                    }
843                    res.success += resp.success_count;
844                    res.failure += resp.failure_count;
845                }
846                Ok(res)
847            }
848            #[cfg(feature = "wecom")]
849            Client::Wecom(client) => {
850                let token_len = msg.tokens.len();
851                let msg: wecom::Message = msg.try_into()?;
852
853                let resp = client.retry_push(&msg).await?;
854
855                let invalid_tokens = resp.invaliduser.unwrap_or_default();
856                let invalid_tokens = if !invalid_tokens.is_empty() {
857                    invalid_tokens
858                        .split("|")
859                        .map(|e| e.to_string())
860                        .collect::<Vec<String>>()
861                } else {
862                    vec![]
863                };
864
865                let res = PushResult {
866                    request_id: resp.msgid,
867                    code: resp.errcode.to_string(),
868                    reason: resp.errmsg,
869                    success: (token_len - invalid_tokens.len()) as i64,
870                    failure: invalid_tokens.len() as i64,
871                    invalid_tokens,
872                };
873
874                Ok(PushResults {
875                    success: res.success,
876                    failure: res.failure,
877                    results: vec![res],
878                })
879            }
880            #[cfg(feature = "apns")]
881            Client::Apns(client) => {
882                let a: Vec<apns::Notification> = msg.try_into()?;
883                let resps = client.retry_batch_push(a.as_slice()).await?;
884                let mut res = PushResults::default();
885                for resp in resps {
886                    if resp.status_code.is_success() {
887                        res.results.push(PushResult {
888                            request_id: resp.apns_id,
889                            code: resp.status_code.to_string(),
890                            reason: serde_json::to_string(&resp.reason).unwrap(),
891                            success: 1,
892                            failure: 0,
893                            invalid_tokens: vec![],
894                        });
895                        res.success += 1;
896                    } else {
897                        res.results.push(PushResult {
898                            request_id: resp.apns_id,
899                            code: resp.status_code.to_string(),
900                            reason: serde_json::to_string(&resp.reason).unwrap(),
901                            success: 0,
902                            failure: 1,
903                            invalid_tokens: vec![resp.token],
904                        });
905                        res.failure += 1;
906                    }
907                }
908                Ok(res)
909            }
910            #[cfg(feature = "email")]
911            Client::Email(client) => {
912                let mut res = PushResults::default();
913                let msg = msg.try_into()?;
914                let resp = client.retry_push(&msg).await?;
915                for resp in resp.results {
916                    if resp.success {
917                        res.success += 1;
918                        res.results.push(PushResult {
919                            success: 1,
920                            ..Default::default()
921                        });
922                    } else {
923                        res.failure += 1;
924                        res.results.push(PushResult {
925                            failure: 1,
926                            invalid_tokens: vec![resp.email.to_string()],
927                            reason: resp.reason.unwrap_or_default(),
928                            ..Default::default()
929                        });
930                    }
931                }
932                Ok(res)
933            }
934            #[cfg(feature = "rtm")]
935            Client::Rtm(client) => {
936                let mut res = PushResults::default();
937                Ok(res)
938            }
939        }
940    }
941}
942
943#[cfg(test)]
944mod tests {
945    use flatten_json_object::{ArrayFormatting, Flattener};
946
947    use crate::{apns, fcm, huawei, xiaomi, Client, Service};
948
949    #[cfg(feature = "wecom")]
950    use crate::wecom;
951
952    #[tokio::test]
953    async fn test_services() {
954        let client_id = std::env::var("MI_CLIENT_ID").unwrap();
955        let client_secret = std::env::var("MI_CLIENT_SECRET").unwrap();
956        let project_id = std::env::var("MI_PROJECT_ID").unwrap();
957
958        let mi = xiaomi::Client::new(&xiaomi::Config {
959            client_id: &client_id,
960            client_secret: &client_secret,
961            project_id: &project_id,
962        })
963        .unwrap();
964
965        let client_id = std::env::var("WECOM_CLIENT_ID").unwrap();
966        let client_secret = std::env::var("WECOM_CLIENT_SECRET").unwrap();
967        let agent_id = std::env::var("WECOM_AGENT_ID")
968            .unwrap()
969            .parse::<i64>()
970            .expect("");
971
972        #[cfg(feature = "wecom")]
973        let wecom_ = wecom::Client::new(&client_id, &client_secret, agent_id)
974            .await
975            .unwrap();
976
977        let client_id = std::env::var("GOOGLE_CLIENT_ID").unwrap();
978        let client_email = std::env::var("GOOGLE_CLIENT_EMAIL").unwrap();
979        let private_id = std::env::var("GOOGLE_PRIVATE_ID").unwrap();
980        let private_key = std::env::var("GOOGLE_PRIVATE_KEY").unwrap();
981        let project_id = std::env::var("GOOGLE_PROJECT_ID").unwrap();
982
983        let mut fcm_ = fcm::Client::new(fcm::Config {
984            key_type: "service_account".to_string().into(),
985            client_id: client_id.into(),
986            private_key_id: private_id.into(),
987            private_key,
988            token_uri: "https://oauth2.googleapis.com/token".to_string(),
989            auth_uri: "https://accounts.google.com/o/oauth2/auth".to_string().into(),
990            project_id: project_id.into(),
991            client_email,
992            auth_provider_x509_cert_url: Some("https://www.googleapis.com/oauth2/v1/certs".to_string()),
993            client_x509_cert_url: Some("https://www.googleapis.com/robot/v1/metadata/x509/firebase-adminsdk-vle32%40avcf-7ea7e.iam.gserviceaccount.com".to_string()),
994        })
995            .await.unwrap();
996
997        fcm_.with_proxy(fcm::ProxyConfig {
998            addr: "socks5://127.0.0.1:7890",
999            user: None,
1000            pass: None,
1001        })
1002        .await;
1003
1004        let apns_ = apns::Client::new(b"", "pass").unwrap();
1005
1006        let hw = huawei::Client::new("", "").await.unwrap();
1007
1008        let svc = Service::new();
1009
1010        svc.register_client("mi", Client::Mi(mi)).await;
1011
1012        #[cfg(feature = "wecom")]
1013        svc.register_client("wecom", Client::Wecom(wecom_)).await;
1014
1015        svc.register_client("fcm", Client::Fcm(fcm_)).await;
1016        svc.register_client("apns", Client::Apns(apns_)).await;
1017        svc.register_client("hw", Client::Huawei(hw)).await;
1018    }
1019
1020    #[tokio::test]
1021    async fn message_to_message() {
1022        // let msg = super::Message::default();
1023        // let xiaomi: xiaomi::Message = msg.into();
1024        let str = r#"
1025            {
1026                "cmd":"cmd",
1027                "ts":1234,
1028                "opt":{
1029                    "reqTs":123,
1030                    "inviter":"nekilc"
1031                }
1032            }
1033        "#;
1034
1035        let val: serde_json::Value = serde_json::from_str(str).unwrap();
1036
1037        let res = Flattener::new()
1038            .set_key_separator(".")
1039            .set_array_formatting(ArrayFormatting::Plain)
1040            .set_preserve_empty_arrays(false)
1041            .set_preserve_empty_objects(false)
1042            .flatten(&val)
1043            .unwrap();
1044        let res = serde_json::to_string_pretty(&res).unwrap();
1045        println!("{}", res);
1046
1047        for group in ["1", "2", "3", "4", "5"].rsplitn(2, |e| false) {
1048            println!("{:?}", group);
1049        }
1050    }
1051
1052    #[test]
1053    #[cfg(feature = "huawei")]
1054    fn test_huawei_msg() {
1055        use crate::{huawei, Body, Message};
1056        let mut tokens = Vec::new();
1057        for i in 0..1100 {
1058            tokens.push(i.to_string());
1059        }
1060        let mut tokens_ = Vec::new();
1061        for i in 0..tokens.len() {
1062            tokens_.push(tokens[i].as_str());
1063        }
1064        let msg = Message {
1065            tokens: tokens_.as_slice(),
1066            body: Body::Transparent(""),
1067            android: None,
1068            apns: None,
1069            wecom: None,
1070        };
1071        let hw_msg: huawei::Message = msg.clone().try_into().unwrap();
1072        assert_eq!(hw_msg.message.token.len(), 1100);
1073
1074        let hw_msgs: Vec<huawei::Message> = msg.try_into().unwrap();
1075        assert_eq!(hw_msgs.len(), 2);
1076        assert_eq!(hw_msgs[0].message.token.len(), 1000);
1077        assert_eq!(hw_msgs[1].message.token.len(), 100);
1078    }
1079
1080    #[test]
1081    #[cfg(feature = "fcm")]
1082    fn test_fcm_msg() {
1083        use crate::{fcm, Body, Message};
1084        let mut tokens = Vec::new();
1085        for i in 0..1100 {
1086            tokens.push(i.to_string());
1087        }
1088        let mut tokens_ = Vec::new();
1089        for i in 0..tokens.len() {
1090            tokens_.push(tokens[i].as_str());
1091        }
1092        let msg = Message {
1093            tokens: tokens_.as_slice(),
1094            body: Body::Transparent(""),
1095            android: None,
1096            apns: None,
1097            wecom: None,
1098        };
1099        let hw_msg: fcm::MulticastMessage = msg.clone().try_into().unwrap();
1100        assert_eq!(hw_msg.tokens.len(), 1100);
1101
1102        let hw_msgs: Vec<fcm::MulticastMessage> = msg.try_into().unwrap();
1103        assert_eq!(hw_msgs.len(), 2);
1104        assert_eq!(hw_msgs[0].tokens.len(), 1000);
1105        assert_eq!(hw_msgs[1].tokens.len(), 100);
1106    }
1107}