hi_push/service/
mod.rs

1pub mod model;
2
3#[cfg(feature = "mongo")]
4pub mod mongo;
5
6use anyhow::anyhow;
7use std::collections::HashMap;
8
9#[cfg(feature = "mongo")]
10pub use mongo::*;
11
12use crate::service::model::Body;
13use crate::{apns, email, fcm, huawei, rtm, xiaomi, PushResults};
14
15#[cfg(feature = "wecom")]
16use crate::wecom;
17
18pub enum Database {
19    #[cfg(feature = "mongo")]
20    Mongo(mongodb::Database),
21}
22
23pub enum Message {
24    Transparent(model::PushTransparentParams),
25    Notification(model::PushNotificationParams),
26}
27
28pub struct App {
29    service: super::Service,
30    #[cfg(feature = "mongo")]
31    db: Database,
32    channels: Vec<model::Channel>,
33}
34
35impl App {
36    #[cfg(feature = "mysql")]
37    pub async fn new(db: sea_orm::Database) -> App {
38        let svcs = super::Service::new();
39        Self {
40            service: svcs,
41            db,
42            channels: Vec::new(),
43        }
44    }
45
46    #[cfg(feature = "mongo")]
47    pub async fn new(mongodb: mongodb::Database) -> App {
48        let svcs = super::Service::new();
49        Self {
50            service: svcs,
51            db: Database::Mongo(mongodb),
52            channels: Vec::new(),
53        }
54    }
55
56    pub async fn init(&mut self) -> anyhow::Result<()> {
57        let chans = match &self.db {
58            Database::Mongo(db) => fetch_all_channels(db).await?,
59        };
60        self.channels = chans;
61        for chan in &self.channels {
62            self.register_client(chan).await?;
63        }
64        Ok(())
65    }
66
67    /// Channel ids on running
68    pub async fn running_ch_ids(&self) -> Vec<String> {
69        let pushers = self.service.pushers.read().await;
70        pushers.iter().map(|e| e.0.clone()).collect()
71    }
72
73    /// Add channel client to memory
74    pub async fn register_client(&self, chan: &model::Channel) -> anyhow::Result<()> {
75        match Self::new_client(chan).await {
76            Ok(cli) => {
77                tracing::info!("load channel:{}", chan.ch_id);
78                self.service.register_client(&chan.ch_id, cli).await;
79            }
80            Err(e) => {
81                tracing::error!("load channel error:`{}` {} ", chan.ch_id, e);
82            }
83        }
84        Ok(())
85    }
86
87    /// Remove channel client from memory.
88    pub async fn deregister_client(&self, ch_id: &str) {
89        self.service.remove_client(ch_id).await;
90    }
91
92    /// Create a new channel client by config.
93    pub async fn new_client(conf: &model::Channel) -> anyhow::Result<super::Client> {
94        Ok(match conf._type {
95            #[cfg(feature = "xiaomi")]
96            model::ChannelType::Mi => super::Client::Mi(xiaomi::Client::new(&xiaomi::Config {
97                client_id: conf
98                    .client_id
99                    .as_ref()
100                    .ok_or(anyhow!("mi missing `client_id`"))?
101                    .as_str(),
102                client_secret: conf
103                    .client_secret
104                    .as_ref()
105                    .ok_or(anyhow!("mi missing `client_secret`"))?
106                    .as_str(),
107                project_id: conf
108                    .project_id
109                    .as_ref()
110                    .ok_or(anyhow!("mi missing `project_id`"))?
111                    .as_str(),
112            })?),
113            #[cfg(feature = "huawei")]
114            model::ChannelType::Huawei => super::Client::Huawei(
115                huawei::Client::new(
116                    conf.client_id
117                        .as_ref()
118                        .ok_or(anyhow!("mi missing `client_id`"))?
119                        .as_str(),
120                    conf.client_secret
121                        .as_ref()
122                        .ok_or(anyhow!("mi missing `client_id`"))?
123                        .as_str(),
124                )
125                .await?,
126            ),
127            #[cfg(feature = "fcm")]
128            model::ChannelType::Fcm => super::Client::Fcm(
129                fcm::Client::new(fcm::Config {
130                    key_type: Some(
131                        conf.key_type
132                            .clone()
133                            .ok_or(anyhow!("Fcm missing `key_type`"))?,
134                    ),
135                    project_id: Some(
136                        conf.project_id
137                            .clone()
138                            .ok_or(anyhow!("Fcm missing `project_id`"))?,
139                    ),
140                    private_key_id: Some(
141                        conf.private_key_id
142                            .clone()
143                            .ok_or(anyhow!("Fcm missing `private_key_id`"))?,
144                    ),
145                    private_key: (conf
146                        .private_key
147                        .clone()
148                        .ok_or(anyhow!("Fcm missing `private_key`"))?),
149                    client_email: (conf
150                        .client_email
151                        .clone()
152                        .ok_or(anyhow!("Fcm missing `client_email`"))?),
153                    client_id: Some(
154                        conf.client_id
155                            .clone()
156                            .ok_or(anyhow!("Fcm missing `client_id`"))?,
157                    ),
158                    auth_uri: Some(
159                        conf.auth_uri
160                            .clone()
161                            .ok_or(anyhow!("Fcm missing `auth_uri`"))?,
162                    ),
163                    token_uri: conf
164                        .token_uri
165                        .clone()
166                        .ok_or(anyhow!("Fcm missing `token_uri`"))?,
167                    auth_provider_x509_cert_url: Some(
168                        conf.auth_provider_x509_cert_url
169                            .clone()
170                            .ok_or(anyhow!("Fcm missing `auth_provider_x509_cert_url`"))?,
171                    ),
172                    client_x509_cert_url: Some(
173                        conf.client_x509_cert_url
174                            .clone()
175                            .ok_or(anyhow!("Fcm missing `client_x509_cert_url`"))?,
176                    ),
177                })
178                .await?,
179            ),
180            #[cfg(feature = "wecom")]
181            model::ChannelType::Wecom => super::Client::Wecom(
182                wecom::Client::new(
183                    conf.client_id
184                        .as_ref()
185                        .ok_or(anyhow!("Wecom missing `client_id`"))?
186                        .as_str(),
187                    conf.client_secret
188                        .as_ref()
189                        .ok_or(anyhow!("Wecom missing `client_secret`"))?
190                        .as_str(),
191                    conf.agentid.ok_or(anyhow!("Wecom missing `agentid`"))?,
192                )
193                .await?,
194            ),
195            #[cfg(feature = "apns")]
196            model::ChannelType::Apns => super::Client::Apns(apns::Client::new(
197                conf.certs
198                    .as_ref()
199                    .ok_or(anyhow!("Apns missing `certs`"))?
200                    .as_slice(),
201                conf.client_secret
202                    .as_ref()
203                    .ok_or(anyhow!("Apns missing `client_secret`"))?
204                    .as_str(),
205            )?),
206            #[cfg(feature = "email")]
207            model::ChannelType::Email => super::Client::Email(
208                email::Client::new(
209                    conf.client_id
210                        .as_ref()
211                        .ok_or(anyhow!("Email missing `client_id`"))?
212                        .as_str(),
213                    conf.client_secret
214                        .as_ref()
215                        .ok_or(anyhow!("Email missing `client_secret`"))?
216                        .as_str(),
217                    conf.addr
218                        .as_ref()
219                        .ok_or(anyhow!("Email missing `addr`"))?
220                        .as_str(),
221                )
222                .await,
223            ),
224            #[cfg(feature = "rtm")]
225            model::ChannelType::Rtm => super::Client::Rtm(rtm::Client::new(
226                conf.client_id
227                    .as_ref()
228                    .ok_or(anyhow!("Rtm missing `client_id`"))?
229                    .as_str(),
230                conf.client_secret
231                    .as_ref()
232                    .ok_or(anyhow!("Rtm missing `client_secret`"))?
233                    .as_str(),
234            )?),
235            model::ChannelType::Unknown => Err(anyhow!("Unknown channel type"))?,
236        })
237    }
238
239    /// Push message
240    pub async fn push_message(
241        &self,
242        client_id: &str,
243        msg: Message,
244    ) -> anyhow::Result<model::PushResp> {
245        let (groups, channels) = match &msg {
246            Message::Transparent(msg) => (msg.groups.as_slice(), msg.channels.as_slice()),
247            Message::Notification(msg) => (msg.groups.as_slice(), msg.channels.as_slice()),
248        };
249
250        let tokens = match &self.db {
251            #[cfg(feature = "mongo")]
252            Database::Mongo(db) => {
253                valid_client_id_and_ch_ids(db, client_id, channels).await?;
254
255                fetch_tokens(db, channels, groups).await?
256            }
257        };
258
259        let mut token_map: HashMap<&str, Vec<&str>> = HashMap::new();
260
261        // collect token by channel
262        for token in &tokens {
263            if let Some(vec) = token_map.get_mut(token.ch_id.as_str()) {
264                vec.push(&token.token);
265            } else {
266                let vec = vec![token.token.as_str()];
267                token_map.insert(&token.ch_id, vec);
268            }
269        }
270
271        let mut push_results = model::PushResp {
272            success: 0,
273            failure: 0,
274            results: Default::default(),
275        };
276
277        let body = match &msg {
278            Message::Transparent(msg) => match &msg.body {
279                Body::Json(_) => super::Body::Transparent(""),
280                Body::Text(text) => super::Body::Transparent(text),
281            },
282            Message::Notification(msg) => super::Body::Notify {
283                title: msg.title.as_str(),
284                body: msg.body.as_str(),
285            },
286        };
287
288        // push message by channel
289        for (chan, tokens) in token_map {
290            let push_res = self
291                .service
292                .retry_batch_push(
293                    &chan,
294                    super::Message {
295                        tokens: &tokens,
296                        body: body,
297                        android: None,
298                        apns: None,
299                        wecom: None,
300                    },
301                )
302                .await?;
303            push_results.success += push_res.success;
304            push_results.failure += push_res.failure;
305            push_results.results.insert(chan.to_string(), push_res);
306        }
307        Ok(push_results)
308    }
309
310    /// Register token
311    pub async fn register_token(
312        &self,
313        client_id: &str,
314        group: &str,
315        ch_id: &str,
316        token: &str,
317        _override: Option<bool>,
318    ) -> anyhow::Result<()> {
319        match &self.db {
320            #[cfg(feature = "mongo")]
321            Database::Mongo(db) => {
322                valid_client_id_and_ch_id(db, client_id, ch_id).await?;
323                insert_token(db, client_id, ch_id, group, token, _override).await?
324            }
325        };
326        Ok(())
327    }
328
329    /// Revoke token
330    pub async fn revoke_token(
331        &self,
332        client_id: &str,
333        group: &str,
334        ch_id: &str,
335        token: &str,
336    ) -> anyhow::Result<()> {
337        match &self.db {
338            #[cfg(feature = "mongo")]
339            Database::Mongo(db) => {
340                valid_client_id_and_ch_id(db, client_id, ch_id).await?;
341                revoke_token(db, ch_id, group, token).await?
342            }
343        };
344        Ok(())
345    }
346
347    /// Create a channel.
348    pub async fn create_channel(
349        &self,
350        app_id: &str,
351        params: model::PublicChannel,
352    ) -> anyhow::Result<String> {
353        let channel = match &self.db {
354            #[cfg(feature = "mongo")]
355            Database::Mongo(db) => create_channel(db, app_id, params).await?,
356        };
357        let cli = Self::new_client(&channel).await?;
358        let _ = self
359            .service
360            .register_client(channel.ch_id.as_str(), cli)
361            .await;
362        Ok(channel.ch_id)
363    }
364
365    /// Fetch channels from database belong client_id .
366    pub async fn fetch_channels(&self, client_id: &str) -> anyhow::Result<Vec<model::Channel>> {
367        let channels = match &self.db {
368            #[cfg(feature = "mongo")]
369            Database::Mongo(db) => fetch_channels_by_client_id(db, client_id).await?,
370        };
371        Ok(channels)
372    }
373
374    /// Create application.
375    pub async fn create_app(&self, name: &str) -> anyhow::Result<model::App> {
376        let app = match &self.db {
377            #[cfg(feature = "mongo")]
378            Database::Mongo(db) => create_app(db, name).await?,
379        };
380        Ok(app)
381    }
382
383    /// Fecth applications from database.
384    pub async fn fetch_apps(&self) -> anyhow::Result<Vec<model::App>> {
385        let apps = match &self.db {
386            #[cfg(feature = "mongo")]
387            Database::Mongo(db) => fetch_apps(db).await?,
388        };
389        Ok(apps)
390    }
391
392    /// Delete application from database.
393    pub async fn delete_app(&self, client_id: &str, client_secret: &str) -> anyhow::Result<()> {
394        let _ = match &self.db {
395            #[cfg(feature = "mongo")]
396            Database::Mongo(db) => delete_app(db, client_id, client_secret).await?,
397        };
398        Ok(())
399    }
400
401    /// Delete channel from database.
402    pub async fn delete_channel(&self, client_id: &str, ch_id: &str) -> anyhow::Result<()> {
403        let _ = match &self.db {
404            #[cfg(feature = "mongo")]
405            Database::Mongo(db) => delete_channel(db, client_id, ch_id).await?,
406        };
407        self.deregister_client(ch_id).await;
408        Ok(())
409    }
410
411    /// Validate client_id and client_secret.
412    pub async fn validate_app(&self, client_id: &str, client_secret: &str) -> anyhow::Result<()> {
413        let _ = match &self.db {
414            #[cfg(feature = "mongo")]
415            Database::Mongo(db) => fetch_app(db, client_id, client_secret).await?,
416        };
417        Ok(())
418    }
419}
420
421#[cfg(test)]
422mod tests {}