1use crate::Result;
2use std::cmp::min;
3
4use crate::handler::Module;
5use anyhow::anyhow;
6use async_trait::async_trait;
7use grammers_client::{Config, InitParams, SignInError, Update};
8use grammers_session::Session;
9use grammers_tl_types as tl;
10use std::ops::Deref;
11use std::path::Path;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::Mutex;
15use tokio::task;
16use tokio::time::sleep;
17
18pub struct Client {
19 pub inner_client: Mutex<Option<grammers_client::Client>>,
20 pub modules: Arc<Vec<Module>>,
21 api_id: i32,
22 api_hash: String,
23 auth: Auth,
24 session_store: Box<dyn SessionStore + Sync + Send>,
25 init_params: Option<InitParams>,
26}
27
28enum MapResult<'a> {
29 None,
30 Process(&'a str, &'a str),
31 Exception(&'a str, &'a str),
32}
33
34macro_rules! map_modules {
35 ($hs:expr, $cp:expr $(,$event:expr, $process:path)* $(,)?) => {{
36 let mut result = MapResult::None;
37 for m in $hs {
38 for h in &m.handlers {
39 match &h.process {
40 $(
41 $process(e) => match e.handle($cp, $event).await {
42 Ok(b) => {
43 if b {
44 result = MapResult::Process(&m.id, &h.id);
45 }
46 }
47 Err(err) => {
48 tracing::error!("error : {:?}", err);
49 result = MapResult::Exception(&m.id, &h.id);
50 }
51 },
52 )*
53 _ => (),
54 }
55 }
56 if let MapResult::None = result {
57 } else {
58 break;
59 }
60 }
61 match result {
62 MapResult::None => tracing::debug!("not process"),
63 MapResult::Process(m, h) => tracing::debug!("process by : {m} : {h}"),
64 MapResult::Exception(m, h) => tracing::debug!("process by : {m} : {h}"),
65 }
66 result
67 }};
68}
69
70async fn hand(modules: Arc<Vec<Module>>, mut client: grammers_client::Client, update: Update) {
71 let client_point = &mut client;
72 let update_point = &update;
73 match update_point {
74 Update::NewMessage(message) => {
75 tracing::debug!(
76 "New Message : {} : {} : {}",
77 message.chat().id(),
78 message.id(),
79 message.text()
80 );
81 let _ = map_modules!(
82 modules.deref(),
83 client_point,
84 message,
85 crate::handler::Process::NewMessageProcess,
86 update_point,
87 crate::handler::Process::UpdateProcess,
88 );
89 }
90 Update::MessageEdited(message) => {
91 tracing::debug!("Message Edited : {}", message.id());
92 let _ = map_modules!(
93 modules.deref(),
94 client_point,
95 message,
96 crate::handler::Process::MessageEditedProcess,
97 update_point,
98 crate::handler::Process::UpdateProcess,
99 );
100 }
101 Update::MessageDeleted(deletion) => {
102 tracing::debug!("Message Deleted : {:?}", deletion.messages());
103 let _ = map_modules!(
104 modules.deref(),
105 client_point,
106 deletion,
107 crate::handler::Process::MessageDeletedProcess,
108 update_point,
109 crate::handler::Process::UpdateProcess,
110 );
111 }
112 Update::CallbackQuery(callback_query) => {
113 tracing::debug!("Callback Query : {:?}", callback_query.chat().id());
114 let _ = map_modules!(
115 modules.deref(),
116 client_point,
117 callback_query,
118 crate::handler::Process::CallbackQueryProcess,
119 update_point,
120 crate::handler::Process::UpdateProcess,
121 );
122 }
123 Update::InlineQuery(inline_query) => {
124 tracing::debug!("Inline Query : {:?}", inline_query.text());
125 let _ = map_modules!(
126 modules.deref(),
127 client_point,
128 inline_query,
129 crate::handler::Process::InlineQueryProcess,
130 update_point,
131 crate::handler::Process::UpdateProcess,
132 );
133 }
134 Update::Raw(update) => {
135 tracing::debug!("Raw : {:?}", update);
136 let _ = map_modules!(
137 modules.deref(),
138 client_point,
139 update,
140 crate::handler::Process::RawProcess,
141 update_point,
142 crate::handler::Process::UpdateProcess,
143 );
144 }
145 _ => {}
146 }
147}
148
149impl Client {
150 async fn load_session(&self) -> Result<Session> {
151 Ok(
152 if let Some(data) = self.session_store.on_load_session().await? {
153 Session::load(&data)?
154 } else {
155 Session::new()
156 },
157 )
158 }
159
160 async fn set_client(&self, inner_client: Option<grammers_client::Client>) {
161 let mut lock = self.inner_client.lock().await;
162 *lock = inner_client;
163 drop(lock);
164 }
165
166 async fn connect(&self) -> Result<grammers_client::Client> {
167 let connect = grammers_client::Client::connect(Config {
168 session: self.load_session().await?,
169 api_id: self.api_id.clone(), api_hash: self.api_hash.clone(),
171 params: match self.init_params.clone() {
172 None => InitParams::default(),
173 Some(params) => params,
174 },
175 })
176 .await;
177 let client = connect?;
178 self.set_client(Some(client.clone())).await;
179 Ok(client)
180 }
181}
182
183pub async fn run_client_and_reconnect<S: Into<Arc<Client>>>(client: S) -> Result<()> {
184 let client = client.into();
185 let mut inner_client = client.connect().await?;
186 tracing::info!("Connected! (first)");
187 tracing::info!("Sending ping...");
188 tracing::info!(
189 "{:?}",
190 inner_client
191 .invoke(&tl::functions::Ping { ping_id: 0 })
192 .await?
193 );
194 if !inner_client.is_authorized().await? {
195 let usr = match &client.auth {
196 Auth::AuthWithPhoneAndCode(auth) => {
197 let token = inner_client
198 .request_login_code(auth.input_phone().await?.as_str())
199 .await?;
200 match inner_client
201 .sign_in(&token, auth.input_code().await?.as_str())
202 .await
203 {
204 Err(SignInError::PasswordRequired(password_token)) => {
205 inner_client
206 .check_password(password_token, auth.input_password().await?.as_str())
207 .await?
208 }
209 Ok(usr) => usr,
210 Err(err) => return Err(anyhow!(err)),
211 }
212 }
213 Auth::AuthWithBotToken(auth) => {
214 inner_client
215 .bot_sign_in(auth.input_bot_token().await?.as_str())
216 .await?
217 }
218 };
219 tracing::info!("login with id : {}", usr.id());
220 client
221 .session_store
222 .on_save_session(inner_client.session().save())
223 .await?;
224 } else {
225 let usr = inner_client.get_me().await?;
226 tracing::info!("session with id : {}", usr.id());
227 }
228
229 let mut error_counter = 0;
230
231 tracing::info!("Waiting for messages...");
232
233 loop {
235 if error_counter > 0 {
237 match client.connect().await {
238 Ok(client_new) => {
239 tracing::warn!("reconnected");
240 inner_client = client_new;
241 match inner_client.is_authorized().await {
242 Ok(auth) => {
243 if !auth {
244 tracing::error!("logged out, exit");
245 break;
246 }
247 }
248 Err(e) => {
249 error_counter += 1;
250 let sleep_sec = 2_u64.pow(min(10, error_counter));
251 tracing::error!("reconnect auth error : sleep {sleep_sec} sec : {e}");
252 sleep(Duration::from_secs(sleep_sec)).await;
253 }
254 }
255 }
256 Err(e) => {
257 error_counter += 1;
258 let sleep_sec = 2_u64.pow(min(10, error_counter));
259 tracing::error!("reconnect error : sleep {sleep_sec} sec : {e}");
260 sleep(Duration::from_secs(sleep_sec)).await;
261 }
262 }
263 }
264 tokio::select! {
265 result = inner_client.next_update() => match result {
266 Ok(update)=> {
267 error_counter = 0;
268 task::spawn(hand(client.modules.clone(),inner_client.clone(), update));
269 }
270 Err(e)=>{
271 error_counter+=1;
272 let sleep_sec = 2_u64.pow(min(10,error_counter));
273 tracing::error!("next_update error : sleep {sleep_sec} sec : {e}");
274 sleep(Duration::from_secs(sleep_sec)).await;
275 }
276 },
277 _ = tokio::signal::ctrl_c() => break,
278 }
279 }
280
281 Ok(())
282}
283
284pub struct ClientBuilder {
285 api_id: Option<i32>,
286 api_hash: Option<String>,
287 auth: Option<Auth>,
288 session_store: Option<Box<dyn SessionStore + Sync + Send>>,
289 modules: Option<Arc<Vec<Module>>>,
290 init_params: Option<InitParams>,
291}
292
293impl ClientBuilder {
294 pub fn new() -> Self {
295 Self {
296 api_id: None,
297 api_hash: None,
298 auth: None,
299 session_store: None,
300 modules: None,
301 init_params: None,
302 }
303 }
304
305 pub fn set_api_id(&mut self, api_id: i32) {
306 self.api_id = Some(api_id)
307 }
308
309 pub fn with_api_id(mut self, api_id: i32) -> Self {
310 self.set_api_id(api_id);
311 self
312 }
313
314 pub fn set_api_hash<S: Into<String>>(&mut self, api_hash: S) {
315 self.api_hash = Some(api_hash.into())
316 }
317
318 pub fn with_api_hash<S: Into<String>>(mut self, api_hash: S) -> Self {
319 self.set_api_hash(api_hash);
320 self
321 }
322
323 pub fn set_auth(&mut self, auth: Auth) {
324 self.auth = Some(auth)
325 }
326
327 pub fn with_auth(mut self, auth: Auth) -> Self {
328 self.set_auth(auth);
329 self
330 }
331
332 pub fn set_session_store(&mut self, session_store: Box<dyn SessionStore + Sync + Send>) {
333 self.session_store = Some(session_store)
334 }
335
336 pub fn with_session_store(
337 mut self,
338 session_store: Box<dyn SessionStore + Sync + Send>,
339 ) -> Self {
340 self.set_session_store(session_store);
341 self
342 }
343
344 pub fn set_modules<S: Into<Arc<Vec<Module>>>>(&mut self, s: S) {
345 self.modules = Some(s.into())
346 }
347
348 pub fn with_modules<S: Into<Arc<Vec<Module>>>>(mut self, s: S) -> Self {
349 self.set_modules(s);
350 self
351 }
352
353 pub fn set_init_params(&mut self, s: Option<InitParams>) {
354 self.init_params = s
355 }
356
357 pub fn with_init_params(mut self, s: Option<InitParams>) -> Self {
358 self.set_init_params(s);
359 self
360 }
361
362 pub fn build(self) -> Result<Client> {
363 return Ok(Client {
364 modules: self.modules.expect("must set modules"),
365 inner_client: Mutex::new(None),
366 api_id: self.api_id.expect("must set api_id"),
367 api_hash: self.api_hash.expect("must set api_hash"),
368 auth: self.auth.expect("must set auth"),
369 session_store: self.session_store.expect("must set session_store"),
370 init_params: self.init_params,
371 });
372 }
373}
374
375pub enum Auth {
376 AuthWithBotToken(Box<dyn AuthWithBotToken + Send + Sync>),
377 AuthWithPhoneAndCode(Box<dyn AuthWithPhoneAndCode + Send + Sync>),
378}
379
380#[async_trait]
381pub trait AuthWithBotToken {
382 async fn input_bot_token(&self) -> Result<String>;
383}
384
385#[async_trait]
386pub trait AuthWithPhoneAndCode {
387 async fn input_phone(&self) -> Result<String>;
388 async fn input_code(&self) -> Result<String>;
389 async fn input_password(&self) -> Result<String>;
390}
391
392#[async_trait]
393pub trait SessionStore {
394 async fn on_save_session(&self, data: Vec<u8>) -> Result<()>;
395 async fn on_load_session(&self) -> Result<Option<Vec<u8>>>;
396}
397
398pub struct StaticBotToken {
399 pub token: String,
400}
401
402#[async_trait]
403impl AuthWithBotToken for StaticBotToken {
404 async fn input_bot_token(&self) -> Result<String> {
405 return Ok(self.token.clone());
406 }
407}
408
409pub struct FileSessionStore {
410 pub path: String,
411}
412
413#[async_trait]
414impl SessionStore for FileSessionStore {
415 async fn on_save_session(&self, data: Vec<u8>) -> Result<()> {
416 tokio::fs::write(self.path.as_str(), data).await?;
417 Ok(())
418 }
419 async fn on_load_session(&self) -> Result<Option<Vec<u8>>> {
420 let path = Path::new(self.path.as_str());
421 if path.exists() {
422 Ok(Some(tokio::fs::read(path).await?))
423 } else {
424 Ok(None)
425 }
426 }
427}