1pub mod model;
2
3#[cfg(feature = "mongo")]
4pub mod mongo;
5
6use anyhow::anyhow;
7use std::collections::HashMap;
8
9#[cfg(feature = "mongo")]
10pub use mongo::*;
11
12use crate::service::model::Body;
13use crate::{apns, email, fcm, huawei, rtm, xiaomi, PushResults};
14
15#[cfg(feature = "wecom")]
16use crate::wecom;
17
18pub enum Database {
19 #[cfg(feature = "mongo")]
20 Mongo(mongodb::Database),
21}
22
23pub enum Message {
24 Transparent(model::PushTransparentParams),
25 Notification(model::PushNotificationParams),
26}
27
28pub struct App {
29 service: super::Service,
30 #[cfg(feature = "mongo")]
31 db: Database,
32 channels: Vec<model::Channel>,
33}
34
35impl App {
36 #[cfg(feature = "mysql")]
37 pub async fn new(db: sea_orm::Database) -> App {
38 let svcs = super::Service::new();
39 Self {
40 service: svcs,
41 db,
42 channels: Vec::new(),
43 }
44 }
45
46 #[cfg(feature = "mongo")]
47 pub async fn new(mongodb: mongodb::Database) -> App {
48 let svcs = super::Service::new();
49 Self {
50 service: svcs,
51 db: Database::Mongo(mongodb),
52 channels: Vec::new(),
53 }
54 }
55
56 pub async fn init(&mut self) -> anyhow::Result<()> {
57 let chans = match &self.db {
58 Database::Mongo(db) => fetch_all_channels(db).await?,
59 };
60 self.channels = chans;
61 for chan in &self.channels {
62 self.register_client(chan).await?;
63 }
64 Ok(())
65 }
66
67 pub async fn running_ch_ids(&self) -> Vec<String> {
69 let pushers = self.service.pushers.read().await;
70 pushers.iter().map(|e| e.0.clone()).collect()
71 }
72
73 pub async fn register_client(&self, chan: &model::Channel) -> anyhow::Result<()> {
75 match Self::new_client(chan).await {
76 Ok(cli) => {
77 tracing::info!("load channel:{}", chan.ch_id);
78 self.service.register_client(&chan.ch_id, cli).await;
79 }
80 Err(e) => {
81 tracing::error!("load channel error:`{}` {} ", chan.ch_id, e);
82 }
83 }
84 Ok(())
85 }
86
87 pub async fn deregister_client(&self, ch_id: &str) {
89 self.service.remove_client(ch_id).await;
90 }
91
92 pub async fn new_client(conf: &model::Channel) -> anyhow::Result<super::Client> {
94 Ok(match conf._type {
95 #[cfg(feature = "xiaomi")]
96 model::ChannelType::Mi => super::Client::Mi(xiaomi::Client::new(&xiaomi::Config {
97 client_id: conf
98 .client_id
99 .as_ref()
100 .ok_or(anyhow!("mi missing `client_id`"))?
101 .as_str(),
102 client_secret: conf
103 .client_secret
104 .as_ref()
105 .ok_or(anyhow!("mi missing `client_secret`"))?
106 .as_str(),
107 project_id: conf
108 .project_id
109 .as_ref()
110 .ok_or(anyhow!("mi missing `project_id`"))?
111 .as_str(),
112 })?),
113 #[cfg(feature = "huawei")]
114 model::ChannelType::Huawei => super::Client::Huawei(
115 huawei::Client::new(
116 conf.client_id
117 .as_ref()
118 .ok_or(anyhow!("mi missing `client_id`"))?
119 .as_str(),
120 conf.client_secret
121 .as_ref()
122 .ok_or(anyhow!("mi missing `client_id`"))?
123 .as_str(),
124 )
125 .await?,
126 ),
127 #[cfg(feature = "fcm")]
128 model::ChannelType::Fcm => super::Client::Fcm(
129 fcm::Client::new(fcm::Config {
130 key_type: Some(
131 conf.key_type
132 .clone()
133 .ok_or(anyhow!("Fcm missing `key_type`"))?,
134 ),
135 project_id: Some(
136 conf.project_id
137 .clone()
138 .ok_or(anyhow!("Fcm missing `project_id`"))?,
139 ),
140 private_key_id: Some(
141 conf.private_key_id
142 .clone()
143 .ok_or(anyhow!("Fcm missing `private_key_id`"))?,
144 ),
145 private_key: (conf
146 .private_key
147 .clone()
148 .ok_or(anyhow!("Fcm missing `private_key`"))?),
149 client_email: (conf
150 .client_email
151 .clone()
152 .ok_or(anyhow!("Fcm missing `client_email`"))?),
153 client_id: Some(
154 conf.client_id
155 .clone()
156 .ok_or(anyhow!("Fcm missing `client_id`"))?,
157 ),
158 auth_uri: Some(
159 conf.auth_uri
160 .clone()
161 .ok_or(anyhow!("Fcm missing `auth_uri`"))?,
162 ),
163 token_uri: conf
164 .token_uri
165 .clone()
166 .ok_or(anyhow!("Fcm missing `token_uri`"))?,
167 auth_provider_x509_cert_url: Some(
168 conf.auth_provider_x509_cert_url
169 .clone()
170 .ok_or(anyhow!("Fcm missing `auth_provider_x509_cert_url`"))?,
171 ),
172 client_x509_cert_url: Some(
173 conf.client_x509_cert_url
174 .clone()
175 .ok_or(anyhow!("Fcm missing `client_x509_cert_url`"))?,
176 ),
177 })
178 .await?,
179 ),
180 #[cfg(feature = "wecom")]
181 model::ChannelType::Wecom => super::Client::Wecom(
182 wecom::Client::new(
183 conf.client_id
184 .as_ref()
185 .ok_or(anyhow!("Wecom missing `client_id`"))?
186 .as_str(),
187 conf.client_secret
188 .as_ref()
189 .ok_or(anyhow!("Wecom missing `client_secret`"))?
190 .as_str(),
191 conf.agentid.ok_or(anyhow!("Wecom missing `agentid`"))?,
192 )
193 .await?,
194 ),
195 #[cfg(feature = "apns")]
196 model::ChannelType::Apns => super::Client::Apns(apns::Client::new(
197 conf.certs
198 .as_ref()
199 .ok_or(anyhow!("Apns missing `certs`"))?
200 .as_slice(),
201 conf.client_secret
202 .as_ref()
203 .ok_or(anyhow!("Apns missing `client_secret`"))?
204 .as_str(),
205 )?),
206 #[cfg(feature = "email")]
207 model::ChannelType::Email => super::Client::Email(
208 email::Client::new(
209 conf.client_id
210 .as_ref()
211 .ok_or(anyhow!("Email missing `client_id`"))?
212 .as_str(),
213 conf.client_secret
214 .as_ref()
215 .ok_or(anyhow!("Email missing `client_secret`"))?
216 .as_str(),
217 conf.addr
218 .as_ref()
219 .ok_or(anyhow!("Email missing `addr`"))?
220 .as_str(),
221 )
222 .await,
223 ),
224 #[cfg(feature = "rtm")]
225 model::ChannelType::Rtm => super::Client::Rtm(rtm::Client::new(
226 conf.client_id
227 .as_ref()
228 .ok_or(anyhow!("Rtm missing `client_id`"))?
229 .as_str(),
230 conf.client_secret
231 .as_ref()
232 .ok_or(anyhow!("Rtm missing `client_secret`"))?
233 .as_str(),
234 )?),
235 model::ChannelType::Unknown => Err(anyhow!("Unknown channel type"))?,
236 })
237 }
238
239 pub async fn push_message(
241 &self,
242 client_id: &str,
243 msg: Message,
244 ) -> anyhow::Result<model::PushResp> {
245 let (groups, channels) = match &msg {
246 Message::Transparent(msg) => (msg.groups.as_slice(), msg.channels.as_slice()),
247 Message::Notification(msg) => (msg.groups.as_slice(), msg.channels.as_slice()),
248 };
249
250 let tokens = match &self.db {
251 #[cfg(feature = "mongo")]
252 Database::Mongo(db) => {
253 valid_client_id_and_ch_ids(db, client_id, channels).await?;
254
255 fetch_tokens(db, channels, groups).await?
256 }
257 };
258
259 let mut token_map: HashMap<&str, Vec<&str>> = HashMap::new();
260
261 for token in &tokens {
263 if let Some(vec) = token_map.get_mut(token.ch_id.as_str()) {
264 vec.push(&token.token);
265 } else {
266 let vec = vec![token.token.as_str()];
267 token_map.insert(&token.ch_id, vec);
268 }
269 }
270
271 let mut push_results = model::PushResp {
272 success: 0,
273 failure: 0,
274 results: Default::default(),
275 };
276
277 let body = match &msg {
278 Message::Transparent(msg) => match &msg.body {
279 Body::Json(_) => super::Body::Transparent(""),
280 Body::Text(text) => super::Body::Transparent(text),
281 },
282 Message::Notification(msg) => super::Body::Notify {
283 title: msg.title.as_str(),
284 body: msg.body.as_str(),
285 },
286 };
287
288 for (chan, tokens) in token_map {
290 let push_res = self
291 .service
292 .retry_batch_push(
293 &chan,
294 super::Message {
295 tokens: &tokens,
296 body: body,
297 android: None,
298 apns: None,
299 wecom: None,
300 },
301 )
302 .await?;
303 push_results.success += push_res.success;
304 push_results.failure += push_res.failure;
305 push_results.results.insert(chan.to_string(), push_res);
306 }
307 Ok(push_results)
308 }
309
310 pub async fn register_token(
312 &self,
313 client_id: &str,
314 group: &str,
315 ch_id: &str,
316 token: &str,
317 _override: Option<bool>,
318 ) -> anyhow::Result<()> {
319 match &self.db {
320 #[cfg(feature = "mongo")]
321 Database::Mongo(db) => {
322 valid_client_id_and_ch_id(db, client_id, ch_id).await?;
323 insert_token(db, client_id, ch_id, group, token, _override).await?
324 }
325 };
326 Ok(())
327 }
328
329 pub async fn revoke_token(
331 &self,
332 client_id: &str,
333 group: &str,
334 ch_id: &str,
335 token: &str,
336 ) -> anyhow::Result<()> {
337 match &self.db {
338 #[cfg(feature = "mongo")]
339 Database::Mongo(db) => {
340 valid_client_id_and_ch_id(db, client_id, ch_id).await?;
341 revoke_token(db, ch_id, group, token).await?
342 }
343 };
344 Ok(())
345 }
346
347 pub async fn create_channel(
349 &self,
350 app_id: &str,
351 params: model::PublicChannel,
352 ) -> anyhow::Result<String> {
353 let channel = match &self.db {
354 #[cfg(feature = "mongo")]
355 Database::Mongo(db) => create_channel(db, app_id, params).await?,
356 };
357 let cli = Self::new_client(&channel).await?;
358 let _ = self
359 .service
360 .register_client(channel.ch_id.as_str(), cli)
361 .await;
362 Ok(channel.ch_id)
363 }
364
365 pub async fn fetch_channels(&self, client_id: &str) -> anyhow::Result<Vec<model::Channel>> {
367 let channels = match &self.db {
368 #[cfg(feature = "mongo")]
369 Database::Mongo(db) => fetch_channels_by_client_id(db, client_id).await?,
370 };
371 Ok(channels)
372 }
373
374 pub async fn create_app(&self, name: &str) -> anyhow::Result<model::App> {
376 let app = match &self.db {
377 #[cfg(feature = "mongo")]
378 Database::Mongo(db) => create_app(db, name).await?,
379 };
380 Ok(app)
381 }
382
383 pub async fn fetch_apps(&self) -> anyhow::Result<Vec<model::App>> {
385 let apps = match &self.db {
386 #[cfg(feature = "mongo")]
387 Database::Mongo(db) => fetch_apps(db).await?,
388 };
389 Ok(apps)
390 }
391
392 pub async fn delete_app(&self, client_id: &str, client_secret: &str) -> anyhow::Result<()> {
394 let _ = match &self.db {
395 #[cfg(feature = "mongo")]
396 Database::Mongo(db) => delete_app(db, client_id, client_secret).await?,
397 };
398 Ok(())
399 }
400
401 pub async fn delete_channel(&self, client_id: &str, ch_id: &str) -> anyhow::Result<()> {
403 let _ = match &self.db {
404 #[cfg(feature = "mongo")]
405 Database::Mongo(db) => delete_channel(db, client_id, ch_id).await?,
406 };
407 self.deregister_client(ch_id).await;
408 Ok(())
409 }
410
411 pub async fn validate_app(&self, client_id: &str, client_secret: &str) -> anyhow::Result<()> {
413 let _ = match &self.db {
414 #[cfg(feature = "mongo")]
415 Database::Mongo(db) => fetch_app(db, client_id, client_secret).await?,
416 };
417 Ok(())
418 }
419}
420
421#[cfg(test)]
422mod tests {}