teleser/
client.rs

1use crate::Result;
2use std::cmp::min;
3
4use crate::handler::Module;
5use anyhow::anyhow;
6use async_trait::async_trait;
7use grammers_client::{Config, InitParams, SignInError, Update};
8use grammers_session::Session;
9use grammers_tl_types as tl;
10use std::ops::Deref;
11use std::path::Path;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::Mutex;
15use tokio::task;
16use tokio::time::sleep;
17
18pub struct Client {
19    pub inner_client: Mutex<Option<grammers_client::Client>>,
20    pub modules: Arc<Vec<Module>>,
21    api_id: i32,
22    api_hash: String,
23    auth: Auth,
24    session_store: Box<dyn SessionStore + Sync + Send>,
25    init_params: Option<InitParams>,
26}
27
28enum MapResult<'a> {
29    None,
30    Process(&'a str, &'a str),
31    Exception(&'a str, &'a str),
32}
33
34macro_rules! map_modules {
35    ($hs:expr, $cp:expr $(,$event:expr, $process:path)* $(,)?) => {{
36        let mut result = MapResult::None;
37            for m in $hs {
38                for h in &m.handlers {
39                    match &h.process {
40                    $(
41                    $process(e) => match e.handle($cp, $event).await {
42                        Ok(b) => {
43                            if b {
44                                result = MapResult::Process(&m.id, &h.id);
45                            }
46                        }
47                        Err(err) => {
48                            tracing::error!("error : {:?}", err);
49                            result = MapResult::Exception(&m.id, &h.id);
50                        }
51                    },
52                    )*
53                    _ => (),
54                }
55                }
56                if let MapResult::None = result {
57                } else {
58                    break;
59                }
60            }
61            match result {
62                MapResult::None => tracing::debug!("not process"),
63                MapResult::Process(m, h) => tracing::debug!("process by : {m} : {h}"),
64                MapResult::Exception(m, h) => tracing::debug!("process by : {m} : {h}"),
65            }
66        result
67    }};
68}
69
70async fn hand(modules: Arc<Vec<Module>>, mut client: grammers_client::Client, update: Update) {
71    let client_point = &mut client;
72    let update_point = &update;
73    match update_point {
74        Update::NewMessage(message) => {
75            tracing::debug!(
76                "New Message : {} : {} : {}",
77                message.chat().id(),
78                message.id(),
79                message.text()
80            );
81            let _ = map_modules!(
82                modules.deref(),
83                client_point,
84                message,
85                crate::handler::Process::NewMessageProcess,
86                update_point,
87                crate::handler::Process::UpdateProcess,
88            );
89        }
90        Update::MessageEdited(message) => {
91            tracing::debug!("Message Edited : {}", message.id());
92            let _ = map_modules!(
93                modules.deref(),
94                client_point,
95                message,
96                crate::handler::Process::MessageEditedProcess,
97                update_point,
98                crate::handler::Process::UpdateProcess,
99            );
100        }
101        Update::MessageDeleted(deletion) => {
102            tracing::debug!("Message Deleted : {:?}", deletion.messages());
103            let _ = map_modules!(
104                modules.deref(),
105                client_point,
106                deletion,
107                crate::handler::Process::MessageDeletedProcess,
108                update_point,
109                crate::handler::Process::UpdateProcess,
110            );
111        }
112        Update::CallbackQuery(callback_query) => {
113            tracing::debug!("Callback Query : {:?}", callback_query.chat().id());
114            let _ = map_modules!(
115                modules.deref(),
116                client_point,
117                callback_query,
118                crate::handler::Process::CallbackQueryProcess,
119                update_point,
120                crate::handler::Process::UpdateProcess,
121            );
122        }
123        Update::InlineQuery(inline_query) => {
124            tracing::debug!("Inline Query : {:?}", inline_query.text());
125            let _ = map_modules!(
126                modules.deref(),
127                client_point,
128                inline_query,
129                crate::handler::Process::InlineQueryProcess,
130                update_point,
131                crate::handler::Process::UpdateProcess,
132            );
133        }
134        Update::Raw(update) => {
135            tracing::debug!("Raw : {:?}", update);
136            let _ = map_modules!(
137                modules.deref(),
138                client_point,
139                update,
140                crate::handler::Process::RawProcess,
141                update_point,
142                crate::handler::Process::UpdateProcess,
143            );
144        }
145        _ => {}
146    }
147}
148
149impl Client {
150    async fn load_session(&self) -> Result<Session> {
151        Ok(
152            if let Some(data) = self.session_store.on_load_session().await? {
153                Session::load(&data)?
154            } else {
155                Session::new()
156            },
157        )
158    }
159
160    async fn set_client(&self, inner_client: Option<grammers_client::Client>) {
161        let mut lock = self.inner_client.lock().await;
162        *lock = inner_client;
163        drop(lock);
164    }
165
166    async fn connect(&self) -> Result<grammers_client::Client> {
167        let connect = grammers_client::Client::connect(Config {
168            session: self.load_session().await?,
169            api_id: self.api_id.clone(), // not actually logging in, but has to look real
170            api_hash: self.api_hash.clone(),
171            params: match self.init_params.clone() {
172                None => InitParams::default(),
173                Some(params) => params,
174            },
175        })
176        .await;
177        let client = connect?;
178        self.set_client(Some(client.clone())).await;
179        Ok(client)
180    }
181}
182
183pub async fn run_client_and_reconnect<S: Into<Arc<Client>>>(client: S) -> Result<()> {
184    let client = client.into();
185    let mut inner_client = client.connect().await?;
186    tracing::info!("Connected! (first)");
187    tracing::info!("Sending ping...");
188    tracing::info!(
189        "{:?}",
190        inner_client
191            .invoke(&tl::functions::Ping { ping_id: 0 })
192            .await?
193    );
194    if !inner_client.is_authorized().await? {
195        let usr = match &client.auth {
196            Auth::AuthWithPhoneAndCode(auth) => {
197                let token = inner_client
198                    .request_login_code(auth.input_phone().await?.as_str())
199                    .await?;
200                match inner_client
201                    .sign_in(&token, auth.input_code().await?.as_str())
202                    .await
203                {
204                    Err(SignInError::PasswordRequired(password_token)) => {
205                        inner_client
206                            .check_password(password_token, auth.input_password().await?.as_str())
207                            .await?
208                    }
209                    Ok(usr) => usr,
210                    Err(err) => return Err(anyhow!(err)),
211                }
212            }
213            Auth::AuthWithBotToken(auth) => {
214                inner_client
215                    .bot_sign_in(auth.input_bot_token().await?.as_str())
216                    .await?
217            }
218        };
219        tracing::info!("login with id : {}", usr.id());
220        client
221            .session_store
222            .on_save_session(inner_client.session().save())
223            .await?;
224    } else {
225        let usr = inner_client.get_me().await?;
226        tracing::info!("session with id : {}", usr.id());
227    }
228
229    let mut error_counter = 0;
230
231    tracing::info!("Waiting for messages...");
232
233    // loop
234    loop {
235        // reconnect
236        if error_counter > 0 {
237            match client.connect().await {
238                Ok(client_new) => {
239                    tracing::warn!("reconnected");
240                    inner_client = client_new;
241                    match inner_client.is_authorized().await {
242                        Ok(auth) => {
243                            if !auth {
244                                tracing::error!("logged out, exit");
245                                break;
246                            }
247                        }
248                        Err(e) => {
249                            error_counter += 1;
250                            let sleep_sec = 2_u64.pow(min(10, error_counter));
251                            tracing::error!("reconnect auth error : sleep {sleep_sec} sec : {e}");
252                            sleep(Duration::from_secs(sleep_sec)).await;
253                        }
254                    }
255                }
256                Err(e) => {
257                    error_counter += 1;
258                    let sleep_sec = 2_u64.pow(min(10, error_counter));
259                    tracing::error!("reconnect error : sleep {sleep_sec} sec : {e}");
260                    sleep(Duration::from_secs(sleep_sec)).await;
261                }
262            }
263        }
264        tokio::select! {
265            result = inner_client.next_update() => match result {
266                Ok(update)=> {
267                    error_counter = 0;
268                    task::spawn(hand(client.modules.clone(),inner_client.clone(), update));
269                }
270                Err(e)=>{
271                    error_counter+=1;
272                    let sleep_sec = 2_u64.pow(min(10,error_counter));
273                    tracing::error!("next_update error : sleep {sleep_sec} sec : {e}");
274                    sleep(Duration::from_secs(sleep_sec)).await;
275                }
276            },
277            _ = tokio::signal::ctrl_c() => break,
278        }
279    }
280
281    Ok(())
282}
283
284pub struct ClientBuilder {
285    api_id: Option<i32>,
286    api_hash: Option<String>,
287    auth: Option<Auth>,
288    session_store: Option<Box<dyn SessionStore + Sync + Send>>,
289    modules: Option<Arc<Vec<Module>>>,
290    init_params: Option<InitParams>,
291}
292
293impl ClientBuilder {
294    pub fn new() -> Self {
295        Self {
296            api_id: None,
297            api_hash: None,
298            auth: None,
299            session_store: None,
300            modules: None,
301            init_params: None,
302        }
303    }
304
305    pub fn set_api_id(&mut self, api_id: i32) {
306        self.api_id = Some(api_id)
307    }
308
309    pub fn with_api_id(mut self, api_id: i32) -> Self {
310        self.set_api_id(api_id);
311        self
312    }
313
314    pub fn set_api_hash<S: Into<String>>(&mut self, api_hash: S) {
315        self.api_hash = Some(api_hash.into())
316    }
317
318    pub fn with_api_hash<S: Into<String>>(mut self, api_hash: S) -> Self {
319        self.set_api_hash(api_hash);
320        self
321    }
322
323    pub fn set_auth(&mut self, auth: Auth) {
324        self.auth = Some(auth)
325    }
326
327    pub fn with_auth(mut self, auth: Auth) -> Self {
328        self.set_auth(auth);
329        self
330    }
331
332    pub fn set_session_store(&mut self, session_store: Box<dyn SessionStore + Sync + Send>) {
333        self.session_store = Some(session_store)
334    }
335
336    pub fn with_session_store(
337        mut self,
338        session_store: Box<dyn SessionStore + Sync + Send>,
339    ) -> Self {
340        self.set_session_store(session_store);
341        self
342    }
343
344    pub fn set_modules<S: Into<Arc<Vec<Module>>>>(&mut self, s: S) {
345        self.modules = Some(s.into())
346    }
347
348    pub fn with_modules<S: Into<Arc<Vec<Module>>>>(mut self, s: S) -> Self {
349        self.set_modules(s);
350        self
351    }
352
353    pub fn set_init_params(&mut self, s: Option<InitParams>) {
354        self.init_params = s
355    }
356
357    pub fn with_init_params(mut self, s: Option<InitParams>) -> Self {
358        self.set_init_params(s);
359        self
360    }
361
362    pub fn build(self) -> Result<Client> {
363        return Ok(Client {
364            modules: self.modules.expect("must set modules"),
365            inner_client: Mutex::new(None),
366            api_id: self.api_id.expect("must set api_id"),
367            api_hash: self.api_hash.expect("must set api_hash"),
368            auth: self.auth.expect("must set auth"),
369            session_store: self.session_store.expect("must set session_store"),
370            init_params: self.init_params,
371        });
372    }
373}
374
375pub enum Auth {
376    AuthWithBotToken(Box<dyn AuthWithBotToken + Send + Sync>),
377    AuthWithPhoneAndCode(Box<dyn AuthWithPhoneAndCode + Send + Sync>),
378}
379
380#[async_trait]
381pub trait AuthWithBotToken {
382    async fn input_bot_token(&self) -> Result<String>;
383}
384
385#[async_trait]
386pub trait AuthWithPhoneAndCode {
387    async fn input_phone(&self) -> Result<String>;
388    async fn input_code(&self) -> Result<String>;
389    async fn input_password(&self) -> Result<String>;
390}
391
392#[async_trait]
393pub trait SessionStore {
394    async fn on_save_session(&self, data: Vec<u8>) -> Result<()>;
395    async fn on_load_session(&self) -> Result<Option<Vec<u8>>>;
396}
397
398pub struct StaticBotToken {
399    pub token: String,
400}
401
402#[async_trait]
403impl AuthWithBotToken for StaticBotToken {
404    async fn input_bot_token(&self) -> Result<String> {
405        return Ok(self.token.clone());
406    }
407}
408
409pub struct FileSessionStore {
410    pub path: String,
411}
412
413#[async_trait]
414impl SessionStore for FileSessionStore {
415    async fn on_save_session(&self, data: Vec<u8>) -> Result<()> {
416        tokio::fs::write(self.path.as_str(), data).await?;
417        Ok(())
418    }
419    async fn on_load_session(&self) -> Result<Option<Vec<u8>>> {
420        let path = Path::new(self.path.as_str());
421        if path.exists() {
422            Ok(Some(tokio::fs::read(path).await?))
423        } else {
424            Ok(None)
425        }
426    }
427}