rust_tdlib/client/
worker.rs

1//! Handlers for all incoming data
2use super::{
3    auth_handler::{AuthStateHandler, ConsoleAuthStateHandler},
4    observer::OBSERVER,
5    tdlib_client::{TdJson, TdLibClient},
6    {Client, ClientState},
7};
8use crate::client::{ClientIdentifier, CLIENT_NOT_AUTHORIZED};
9use crate::types::{CheckAuthenticationBotToken, GetAuthorizationState, JsonValue};
10use crate::{
11    errors::{Error, Result},
12    tdjson::ClientId,
13    types::{
14        AuthorizationState, CheckAuthenticationCode, CheckAuthenticationPassword,
15        CheckDatabaseEncryptionKey, GetApplicationConfig, RObject, RegisterUser,
16        SetAuthenticationPhoneNumber, SetTdlibParameters, Update, UpdateAuthorizationState,
17    },
18};
19use std::collections::HashMap;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22use std::time::Duration;
23use tokio::sync::Mutex;
24use tokio::{
25    sync::{mpsc, RwLock},
26    task::JoinHandle,
27    time,
28};
29
30#[derive(Debug)]
31pub struct WorkerBuilder<A, T>
32where
33    A: AuthStateHandler + Send + Sync + 'static,
34    T: TdLibClient + Send + Sync + Clone + 'static,
35{
36    read_updates_timeout: f64,
37    channels_send_timeout: f64,
38    auth_state_handler: A,
39    tdlib_client: T,
40}
41
42impl Default for WorkerBuilder<ConsoleAuthStateHandler, TdJson> {
43    /// Provides default implementation with [ConsoleAuthStateHandler](crate::client::client::ConsoleAuthStateHandler)
44    fn default() -> Self {
45        Self {
46            read_updates_timeout: 1.0,
47            channels_send_timeout: 5.0,
48            auth_state_handler: ConsoleAuthStateHandler::new(),
49            tdlib_client: TdJson::new(),
50        }
51    }
52}
53
54impl<A, T> WorkerBuilder<A, T>
55where
56    A: AuthStateHandler + Send + Sync + 'static,
57    T: TdLibClient + Send + Sync + Clone + 'static,
58{
59    /// Specifies timeout which will be used during sending to [tokio::sync::mpsc](tokio::sync::mpsc).
60    pub fn with_channels_send_timeout(mut self, timeout: f64) -> Self {
61        self.channels_send_timeout = timeout;
62        self
63    }
64
65    pub fn with_read_updates_timeout(mut self, read_updates_timeout: f64) -> Self {
66        self.read_updates_timeout = read_updates_timeout;
67        self
68    }
69
70    /// [AuthStateHandler](crate::client::client::AuthStateHandler) allows you to handle particular "auth states", such as [WaitPassword](crate::types::AuthorizationStateWaitPassword), [WaitPhoneNumber](crate::types::AuthorizationStateWaitPhoneNumber) and so on.
71    /// See [AuthorizationState](crate::types::AuthorizationState).
72    pub fn with_auth_state_handler<N>(self, auth_state_handler: N) -> WorkerBuilder<N, T>
73    where
74        N: AuthStateHandler + Send + Sync + 'static,
75    {
76        WorkerBuilder {
77            auth_state_handler,
78            read_updates_timeout: self.read_updates_timeout,
79            channels_send_timeout: self.channels_send_timeout,
80            tdlib_client: self.tdlib_client,
81        }
82    }
83
84    #[doc(hidden)]
85    pub fn with_tdlib_client<C>(self, tdlib_client: C) -> WorkerBuilder<A, C>
86    where
87        C: TdLibClient + Send + Sync + Clone + 'static,
88    {
89        WorkerBuilder {
90            tdlib_client,
91            auth_state_handler: self.auth_state_handler,
92            read_updates_timeout: self.read_updates_timeout,
93            channels_send_timeout: self.channels_send_timeout,
94        }
95    }
96
97    pub fn build(self) -> Result<Worker<A, T>> {
98        let worker = Worker::new(
99            self.auth_state_handler,
100            self.read_updates_timeout,
101            self.channels_send_timeout,
102            self.tdlib_client,
103        );
104        Ok(worker)
105    }
106}
107
108pub(crate) type StateMessage = Result<ClientState, (Error, UpdateAuthorizationState)>;
109
110#[derive(Debug, Clone)]
111struct ClientContext<S: TdLibClient + Clone> {
112    client: Client<S>,
113    private_state_message_sender: mpsc::Sender<ClientState>,
114    private_state_message_receiver: Arc<Mutex<mpsc::Receiver<ClientState>>>,
115    pub_state_message_sender: Option<mpsc::Sender<StateMessage>>,
116    pub_state_message_receiver: Option<Arc<Mutex<mpsc::Receiver<StateMessage>>>>,
117}
118
119impl<S> ClientContext<S>
120where
121    S: TdLibClient + Clone,
122{
123    pub fn client(&self) -> &Client<S> {
124        &self.client
125    }
126    pub fn private_state_message_receiver(&self) -> &Arc<Mutex<mpsc::Receiver<ClientState>>> {
127        &self.private_state_message_receiver
128    }
129    pub fn private_state_message_sender(&self) -> &mpsc::Sender<ClientState> {
130        &self.private_state_message_sender
131    }
132    pub fn pub_state_message_sender(&self) -> &Option<mpsc::Sender<StateMessage>> {
133        &self.pub_state_message_sender
134    }
135    pub fn pub_state_message_receiver(&self) -> &Option<Arc<Mutex<mpsc::Receiver<StateMessage>>>> {
136        &self.pub_state_message_receiver
137    }
138}
139
140type ClientsMap<S> = HashMap<ClientId, ClientContext<S>>;
141
142/// The main object in all interactions.
143/// You have to [start](crate::client::worker::Worker::start) worker and bind each client with worker using [auth_client](crate::client::worker::Worker::auth_client).
144#[derive(Debug, Clone)]
145pub struct Worker<A, S>
146where
147    A: AuthStateHandler + Send + Sync + 'static,
148    S: TdLibClient + Send + Sync + Clone + 'static,
149{
150    run_flag: Arc<AtomicBool>,
151    auth_state_handler: Arc<A>,
152    read_updates_timeout: Duration,
153    channels_send_timeout: Duration,
154    tdlib_client: S,
155    clients: Arc<RwLock<ClientsMap<S>>>,
156}
157
158impl Worker<ConsoleAuthStateHandler, TdJson> {
159    pub fn builder() -> WorkerBuilder<ConsoleAuthStateHandler, TdJson> {
160        WorkerBuilder::default()
161    }
162}
163
164impl<A, T> Worker<A, T>
165where
166    A: AuthStateHandler + Send + Sync + 'static,
167    T: TdLibClient + Send + Sync + Clone + 'static,
168{
169    /// Returns state of the client.
170    pub async fn get_client_state(
171        &self,
172        client: &Client<T>,
173    ) -> Result<(ClientState, AuthorizationState)> {
174        let state = client
175            .get_authorization_state(GetAuthorizationState::builder().build())
176            .await?;
177        match &state {
178            AuthorizationState::_Default => {
179                panic!()
180            }
181            AuthorizationState::Closed(_) => Ok((ClientState::Closed, state)),
182            AuthorizationState::Closing(_) => Ok((ClientState::Closed, state)),
183            AuthorizationState::LoggingOut(_) => Ok((ClientState::Closed, state)),
184            AuthorizationState::Ready(_) => Ok((ClientState::Opened, state)),
185            AuthorizationState::WaitCode(_) => Ok((ClientState::Authorizing, state)),
186            AuthorizationState::WaitEncryptionKey(_) => Ok((ClientState::Authorizing, state)),
187            AuthorizationState::WaitOtherDeviceConfirmation(_) => {
188                Ok((ClientState::Authorizing, state))
189            }
190            AuthorizationState::WaitPassword(_) => Ok((ClientState::Authorizing, state)),
191            AuthorizationState::WaitPhoneNumber(_) => Ok((ClientState::Authorizing, state)),
192            AuthorizationState::WaitRegistration(_) => Ok((ClientState::Authorizing, state)),
193            AuthorizationState::WaitTdlibParameters(_) => Ok((ClientState::Authorizing, state)),
194            AuthorizationState::GetAuthorizationState(_) => {
195                panic!()
196            }
197        }
198    }
199
200    /// Drops authorized client.
201    /// After method call you cannot interact with TDLib by the client.
202    pub async fn reset_auth(&mut self, client: &mut Client<T>) -> Result<()> {
203        client.stop().await?;
204        let client_id = client.take_client_id()?;
205        self.clients.write().await.remove(&client_id);
206        Ok(())
207    }
208
209    /// Method waits for client state changes.
210    /// If an error occured during authorization flow, you receive [AuthorizationState](crate::types::authorization_state::AuthorizationState) on which it happened.
211    /// You have to setup [channel](tokio::sync::mpsc::channel) by call [Client::builder().with_auth_state_channel(...)](Client::builder().with_auth_state_channel(...))
212    pub async fn wait_auth_state_change(&self, client: &Client<T>) -> Result<StateMessage> {
213        let client_id = client.get_client_id().ok_or(CLIENT_NOT_AUTHORIZED)?;
214        match self.clients.read().await.get(&client_id) {
215            None => {Err(Error::BadRequest("client not authorized yet"))}
216            Some(v) => {
217                match v.pub_state_message_receiver() {
218                    None => {Err(Error::BadRequest("state receiver not specified, need to call `Client::builder().with_auth_state_channel(...) before Worker::bind_client(...)"))}
219                    Some(rec) => {
220                        let state = rec.lock().await.recv().await.ok_or(Error::Internal("can't receive state: channel closed"))?;
221                        Ok(state)
222                    }
223                }
224            }
225        }
226    }
227
228    /// Method waits for client state changes.
229    /// It differ from [wait_auth_state_change](crate::client::worker::Worker::wait_auth_state_change) by error type: you won't receive (AuthorizationState)[crate::types::authorization_state::AuthorizationState] when error occured.
230    /// Method may be useful if client already authorized on, for example, previous application startup.
231    pub async fn wait_client_state(&self, client: &Client<T>) -> Result<ClientState> {
232        let guard = self.clients.read().await;
233        match guard.get(&client.get_client_id().ok_or(CLIENT_NOT_AUTHORIZED)?) {
234            None => Err(Error::BadRequest("client not bound yet")),
235            Some(ctx) => {
236                let mut rec = ctx.private_state_message_receiver().lock().await;
237                Ok(rec.recv().await.unwrap())
238            }
239        }
240    }
241
242    /// Binds client with worker and runs authorization routines.
243    /// Method returns error if worker is not running or client already bound
244    pub async fn bind_client(&mut self, mut client: Client<T>) -> Result<Client<T>> {
245        if !self.is_running() {
246            return Err(Error::BadRequest("worker not started yet"));
247        };
248        let client_id = client.get_tdlib_client().new_client();
249        log::debug!("new client created: {}", client_id);
250        client.set_client_id(client_id)?;
251        self.store_client_context(&client).await?;
252
253        Ok(client)
254    }
255
256    pub async fn reload_client(&mut self, mut client: Client<T>) -> Result<Client<T>> {
257        if !self.is_running() {
258            return Err(Error::BadRequest("worker not started yet"));
259        };
260        let client_id = client.get_tdlib_client().new_client();
261        log::debug!("new client created: {}", client_id);
262        let old_client_id = client.reload(client_id).await?;
263        self.store_client_context(&client).await?;
264        self.clients.write().await.remove(&old_client_id);
265
266        Ok(client)
267    }
268
269    async fn store_client_context(&self, client: &Client<T>) -> Result<()> {
270        let (sx, rx) = match client.get_auth_state_channel_size() {
271            None => (None, None),
272            Some(size) => {
273                let (sx, rx) = mpsc::channel(size);
274                (Some(sx), Some(Arc::new(Mutex::new(rx))))
275            }
276        };
277
278        let (psx, prx) = mpsc::channel::<ClientState>(5);
279        let ctx = ClientContext {
280            client: client.clone(),
281            pub_state_message_sender: sx,
282            pub_state_message_receiver: rx,
283            private_state_message_receiver: Arc::new(Mutex::new(prx)),
284            private_state_message_sender: psx,
285        };
286
287        let client_id = client.get_client_id().ok_or(CLIENT_NOT_AUTHORIZED)?;
288
289        self.clients.write().await.insert(client_id, ctx);
290        log::debug!("new client added");
291
292        // We need to call any tdlib method to retrieve first response.
293        // Otherwise client can't be authorized: no `UpdateAuthorizationState` send by TDLib.
294        first_internal_request(&client.get_tdlib_client(), client_id).await;
295
296        log::trace!("received first internal response");
297
298        Ok(())
299    }
300
301    /// Determines that the worker is running.
302    pub fn is_running(&self) -> bool {
303        self.run_flag.load(Ordering::Acquire)
304    }
305
306    #[cfg(test)]
307    // Method needs for tests because we can't handle get_application_config request properly.
308    pub async fn set_client(&mut self, mut client: Client<T>) -> Client<T> {
309        let client_id = client.get_tdlib_client().new_client();
310        log::debug!("new client created: {}", client_id);
311        client.set_client_id(client_id).unwrap();
312
313        let (psx, prx) = mpsc::channel::<ClientState>(5);
314        let ctx = ClientContext {
315            client: client.clone(),
316            pub_state_message_sender: None,
317            pub_state_message_receiver: None,
318            private_state_message_receiver: Arc::new(Mutex::new(prx)),
319            private_state_message_sender: psx,
320        };
321
322        self.clients.write().await.insert(client_id, ctx);
323        client
324    }
325
326    // Client must be created only with builder
327    pub(crate) fn new(
328        auth_state_handler: A,
329        read_updates_timeout: f64,
330        channels_send_timeout: f64,
331        tdlib_client: T,
332    ) -> Self {
333        let run_flag = Arc::new(AtomicBool::new(false));
334        let clients: ClientsMap<T> = HashMap::new();
335
336        Self {
337            run_flag,
338            tdlib_client,
339            read_updates_timeout: time::Duration::from_secs_f64(read_updates_timeout),
340            channels_send_timeout: time::Duration::from_secs_f64(channels_send_timeout),
341            auth_state_handler: Arc::new(auth_state_handler),
342            clients: Arc::new(RwLock::new(clients)),
343        }
344    }
345
346    /// Starts interaction with TDLib.
347    /// It returns [JoinHandle](tokio::task::JoinHandle) which allows you to handle worker state: if it yields - so worker is definitely stopped.
348    pub fn start(&mut self) -> JoinHandle<()> {
349        let (auth_sx, auth_rx) = mpsc::channel::<UpdateAuthorizationState>(20);
350
351        self.run_flag.store(true, Ordering::Release);
352        let updates_handle = self.init_updates_task(auth_sx);
353        let auth_handle = self.init_auth_task(auth_rx);
354
355        let run_flag = self.run_flag.clone();
356
357        tokio::spawn(async move {
358            tokio::select! {
359                _ = auth_handle => {
360                    log::debug!("authorization task stopped");
361                },
362                _ = updates_handle => {
363                    log::debug!("updates task stopped");
364                },
365            };
366            run_flag.store(false, Ordering::Release);
367        })
368    }
369
370    /// Stops the client.
371    /// You may want to await JoinHandle retrieved with `client.start().await` after calling `stop`.
372    pub fn stop(&self) {
373        self.run_flag.store(false, Ordering::Release);
374    }
375
376    // It's the base routine: sends received updates to particular handlers: observer or auth_state handler
377    fn init_updates_task(&self, auth_sx: mpsc::Sender<UpdateAuthorizationState>) -> JoinHandle<()> {
378        let run_flag = self.run_flag.clone();
379        let clients = self.clients.clone();
380        let recv_timeout = self.read_updates_timeout;
381        let send_timeout = self.channels_send_timeout;
382        let tdlib_client = Arc::new(self.tdlib_client.clone());
383
384        tokio::spawn(async move {
385            let current = tokio::runtime::Handle::try_current().unwrap();
386            while run_flag.load(Ordering::Acquire) {
387                let cl = tdlib_client.clone();
388                if let Some(json) = current
389                    .spawn_blocking(move || cl.receive(recv_timeout.as_secs_f64()))
390                    .await
391                    .unwrap()
392                {
393                    log::trace!("received json from tdlib: {}", json);
394                    handle_td_resp_received(json.as_str(), &auth_sx, &clients, send_timeout).await;
395                }
396            }
397        })
398    }
399
400    pub async fn handle_auth_state(
401        &self,
402        auth_state: &AuthorizationState,
403        client: &Client<T>,
404    ) -> Result<()> {
405        let clients_guard = self.clients.read().await;
406        match clients_guard.get(&client.get_client_id().ok_or(CLIENT_NOT_AUTHORIZED)?) {
407            None => Err(Error::BadRequest("client not bound yet")),
408            Some(ctx) => {
409                handle_auth_state(
410                    client,
411                    ctx.pub_state_message_sender(),
412                    ctx.private_state_message_sender(),
413                    self.auth_state_handler.as_ref(),
414                    auth_state,
415                    self.channels_send_timeout,
416                )
417                .await
418            }
419        }
420    }
421
422    // creates task handles [UpdateAuthorizationState][crate::types::UpdateAuthorizationState] and sends it to particular methods of specified [AuthStateHandler](crate::client::client::AuthStateHandler)
423    fn init_auth_task(
424        &self,
425        mut auth_rx: mpsc::Receiver<UpdateAuthorizationState>,
426    ) -> JoinHandle<()> {
427        let auth_state_handler = self.auth_state_handler.clone();
428        let clients = self.clients.clone();
429        let send_timeout = self.channels_send_timeout;
430
431        tokio::spawn(async move {
432            while let Some(auth_state) = auth_rx.recv().await {
433                log::debug!("received new auth state: {:?}", auth_state);
434                if let Some(client_id) = auth_state.client_id() {
435                    let result = match clients.read().await.get(&client_id) {
436                        None => {
437                            log::warn!("found auth updates for unavailable client ({})", client_id);
438                            continue;
439                        }
440                        Some(client_ctx) => {
441                            handle_auth_state(
442                                client_ctx.client(),
443                                client_ctx.pub_state_message_sender(),
444                                client_ctx.private_state_message_sender(),
445                                auth_state_handler.as_ref(),
446                                auth_state.authorization_state(),
447                                send_timeout,
448                            )
449                            .await
450                        }
451                    };
452
453                    match result {
454                        Ok(_) => {
455                            log::debug!("state changes handled properly")
456                        }
457                        Err(err) => {
458                            match clients.read().await.get(&client_id) {
459                                None => {
460                                    log::error!("client not found")
461                                }
462                                Some(cl) => match cl.pub_state_message_sender() {
463                                    Some(state_sender) => {
464                                        if let Err(err) = state_sender
465                                            .send_timeout(Err((err, auth_state)), send_timeout)
466                                            .await
467                                        {
468                                            log::error!("cannot send client state changes: {}", err)
469                                        }
470                                    }
471                                    None => {
472                                        log::warn!("error received and possibly cannot be handled because of empty state receiver for client {client_id}: {err}")
473                                    }
474                                },
475                            };
476                        }
477                    }
478                }
479            }
480        })
481    }
482}
483
484async fn handle_td_resp_received<S: TdLibClient + Send + Sync + Clone>(
485    response: &str,
486    auth_sx: &mpsc::Sender<UpdateAuthorizationState>,
487    clients: &RwLock<ClientsMap<S>>,
488    send_timeout: Duration,
489) {
490    match serde_json::from_str::<serde_json::Value>(response) {
491        Err(e) => log::error!("can't deserialize tdlib data: {}", e),
492        Ok(t) => {
493            if let Some(t) = OBSERVER.notify(t) {
494                match serde_json::from_value::<Update>(t) {
495                    Err(err) => {
496                        log::error!("cannot deserialize to update: {err:?}, data: {response:?}")
497                    }
498                    Ok(update) => {
499                        if let Update::AuthorizationState(auth_state) = update {
500                            log::trace!("auth state send: {:?}", auth_state);
501                            match auth_sx.send_timeout(auth_state, send_timeout).await {
502                                Ok(_) => {
503                                    log::trace!("auth state sent");
504                                }
505                                Err(err) => {
506                                    log::error!("can't send auth state update: {}", err)
507                                }
508                            };
509                        } else if let Some(client_id) = update.client_id() {
510                            match clients.read().await.get(&client_id) {
511                                None => {
512                                    log::warn!(
513                                        "found updates for unavailable client ({})",
514                                        client_id
515                                    )
516                                }
517                                Some(ctx) => {
518                                    if let Some(sender) = ctx.client().updates_sender() {
519                                        log::trace!("sending update to client");
520                                        match sender
521                                            .send_timeout(Box::new(update), send_timeout)
522                                            .await
523                                        {
524                                            Ok(_) => {
525                                                log::trace!("update sent");
526                                            }
527                                            Err(err) => {
528                                                log::error!("can't send update: {}", err)
529                                            }
530                                        };
531                                    }
532                                }
533                            }
534                        }
535                    }
536                }
537            }
538        }
539    }
540}
541
542impl<A, S> Drop for Worker<A, S>
543where
544    A: AuthStateHandler + Send + Sync + 'static,
545    S: TdLibClient + Send + Sync + Clone + 'static,
546{
547    fn drop(&mut self) {
548        self.stop();
549    }
550}
551
552async fn handle_auth_state<A, R>(
553    client: &Client<R>,
554    pub_state_sender: &Option<mpsc::Sender<StateMessage>>,
555    private_state_sender: &mpsc::Sender<ClientState>,
556    auth_state_handler: &A,
557    state: &AuthorizationState,
558    send_state_timeout: Duration,
559) -> Result<()>
560where
561    A: AuthStateHandler + Sync,
562    R: TdLibClient + Clone,
563{
564    log::debug!("handling new auth state: {:?}", state);
565    let mut result_state = None;
566    let res = match state {
567        AuthorizationState::_Default => Ok(()),
568        AuthorizationState::Closing(_) => Ok(()),
569        AuthorizationState::LoggingOut(_) => Ok(()),
570        AuthorizationState::Closed(_) => {
571            result_state = Some(ClientState::Closed);
572            Ok(())
573        }
574        AuthorizationState::Ready(_) => {
575            log::debug!("ready state received, send signal");
576            result_state = Some(ClientState::Opened);
577            Ok(())
578        }
579        AuthorizationState::WaitCode(wait_code) => {
580            let code = auth_state_handler
581                .handle_wait_code(client.get_auth_handler(), wait_code)
582                .await;
583            client
584                .check_authentication_code(CheckAuthenticationCode::builder().code(code).build())
585                .await?;
586            Ok(())
587        }
588        AuthorizationState::WaitEncryptionKey(wait_encryption_key) => {
589            let key = auth_state_handler
590                .handle_encryption_key(client.get_auth_handler(), wait_encryption_key)
591                .await;
592            log::debug!("checking encryption key");
593            client
594                .check_database_encryption_key(
595                    CheckDatabaseEncryptionKey::builder()
596                        .encryption_key(key)
597                        .build(),
598                )
599                .await?;
600            log::debug!("encryption key check done");
601            Ok(())
602        }
603        AuthorizationState::WaitOtherDeviceConfirmation(wait_device_confirmation) => {
604            log::debug!("handling other device confirmation");
605            auth_state_handler
606                .handle_other_device_confirmation(
607                    client.get_auth_handler(),
608                    wait_device_confirmation,
609                )
610                .await;
611            log::debug!("handled other device confirmation");
612            Ok(())
613        }
614        AuthorizationState::WaitPassword(wait_password) => {
615            let password = auth_state_handler
616                .handle_wait_password(client.get_auth_handler(), wait_password)
617                .await;
618            log::debug!("checking password");
619            client
620                .check_authentication_password(
621                    CheckAuthenticationPassword::builder()
622                        .password(password)
623                        .build(),
624                )
625                .await?;
626            log::debug!("password checked");
627            Ok(())
628        }
629        AuthorizationState::WaitPhoneNumber(wait_phone_number) => {
630            let identifier = auth_state_handler
631                .handle_wait_client_identifier(client.get_auth_handler(), wait_phone_number)
632                .await;
633            match identifier {
634                ClientIdentifier::BotToken(token) => {
635                    client
636                        .check_authentication_bot_token(
637                            CheckAuthenticationBotToken::builder().token(token).build(),
638                        )
639                        .await?;
640                    Ok(())
641                }
642                ClientIdentifier::PhoneNumber(phone) => {
643                    client
644                        .set_authentication_phone_number(
645                            SetAuthenticationPhoneNumber::builder()
646                                .phone_number(phone)
647                                .build(),
648                        )
649                        .await?;
650                    Ok(())
651                }
652            }
653        }
654        AuthorizationState::WaitRegistration(wait_registration) => {
655            log::debug!("handling wait registration");
656            let (first_name, last_name) = auth_state_handler
657                .handle_wait_registration(client.get_auth_handler(), wait_registration)
658                .await;
659            let register = RegisterUser::builder()
660                .first_name(first_name)
661                .last_name(last_name)
662                .build();
663            client.register_user(register).await?;
664            log::debug!("handled register user");
665            Ok(())
666        }
667        AuthorizationState::WaitTdlibParameters(_) => {
668            log::debug!("going to set tdlib parameters");
669            client
670                .set_tdlib_parameters(
671                    SetTdlibParameters::builder()
672                        .parameters(client.tdlib_parameters())
673                        .build(),
674                )
675                .await?;
676            log::debug!("tdlib parameters set");
677            Ok(())
678        }
679        AuthorizationState::GetAuthorizationState(_) => Err(Error::Internal(
680            "retrieved GetAuthorizationState update but observer not found any subscriber",
681        )),
682    };
683
684    match &result_state {
685        None => {}
686        Some(state) => {
687            if let Err(err) = private_state_sender.send(state.clone()).await {
688                {
689                    log::error!(
690                        "can't send state update, but state changed; error: {:?}, state: {:?}",
691                        err,
692                        state
693                    )
694                };
695            }
696
697            if let Some(sender) = &pub_state_sender {
698                if let Err(err) = sender
699                    .send_timeout(Ok(state.clone()), send_state_timeout)
700                    .await
701                {
702                    log::error!(
703                        "can't send state update, but state changed; error: {:?}, state: {:?}",
704                        err,
705                        state
706                    )
707                };
708            }
709        }
710    }
711    res
712}
713
714async fn first_internal_request<S: TdLibClient>(tdlib_client: &S, client_id: ClientId) {
715    let req = GetApplicationConfig::builder().build();
716    let extra = match req.as_ref().extra().ok_or(Error::Internal(
717        "invalid tdlib response type, not have `extra` field",
718    )) {
719        Ok(v) => v,
720        Err(err) => {
721            log::error!("{}", err);
722            return;
723        }
724    };
725    let signal = OBSERVER.subscribe(extra);
726    if let Err(err) = tdlib_client.send(client_id, req.as_ref()) {
727        log::error!("{}", err);
728        return;
729    };
730
731    let received = signal.await;
732    OBSERVER.unsubscribe(extra);
733    match received {
734        Err(_) => log::error!("receiver already closed"),
735        Ok(v) => {
736            log::trace!("first internal response: {v}");
737            if let Err(e) = serde_json::from_value::<JsonValue>(v) {
738                log::warn!("invalid first internal response received: {}", e)
739            }
740        }
741    };
742}
743
744#[cfg(test)]
745mod tests {
746    use crate::client::tdlib_client::TdLibClient;
747    use crate::client::worker::Worker;
748    use crate::client::Client;
749    use crate::errors::Result;
750    use crate::tdjson;
751    use crate::types::{Chats, RFunction, RObject, SearchPublicChats, TdlibParameters};
752    use std::time::Duration;
753    use tokio::time::timeout;
754
755    #[derive(Clone)]
756    struct MockedRawApi {
757        to_receive: Option<String>,
758    }
759
760    impl MockedRawApi {
761        pub fn set_to_receive(&mut self, value: String) {
762            log::trace!("delayed to receive: {}", value);
763            self.to_receive = Some(value);
764        }
765
766        pub fn new() -> Self {
767            Self { to_receive: None }
768        }
769    }
770
771    impl TdLibClient for MockedRawApi {
772        fn send<Fnc: RFunction>(&self, _client_id: tdjson::ClientId, _fnc: Fnc) -> Result<()> {
773            Ok(())
774        }
775
776        fn receive(&self, _timeout: f64) -> Option<String> {
777            self.to_receive.clone()
778        }
779
780        fn execute<Fnc: RFunction>(&self, _fnc: Fnc) -> Result<Option<String>> {
781            unimplemented!()
782        }
783
784        fn new_client(&self) -> tdjson::ClientId {
785            1
786        }
787    }
788
789    #[tokio::test]
790    async fn test_start_and_auth() {
791        let mocked_raw_api = MockedRawApi::new();
792        let mut worker = Worker::builder()
793            .with_tdlib_client(mocked_raw_api.clone())
794            .build()
795            .unwrap();
796        let res = timeout(
797            Duration::from_millis(50),
798            worker.bind_client(
799                Client::builder()
800                    .with_tdlib_client(mocked_raw_api.clone())
801                    .with_tdlib_parameters(TdlibParameters::builder().build())
802                    .build()
803                    .unwrap(),
804            ),
805        )
806        .await;
807        match res {
808            Err(e) => panic!("{:?}", e),
809            Ok(v) => match v {
810                Err(e) => assert_eq!(e.to_string(), "worker not started yet".to_string()),
811                Ok(_) => panic!("error not raised"),
812            },
813        };
814
815        worker.start();
816        // we can't handle first request because we do not know @extra. so just wait a while.
817        let res = timeout(
818            Duration::from_millis(50),
819            worker.bind_client(
820                Client::builder()
821                    .with_tdlib_client(mocked_raw_api.clone())
822                    .with_tdlib_parameters(TdlibParameters::builder().build())
823                    .build()
824                    .unwrap(),
825            ),
826        )
827        .await;
828        match res {
829            Err(_) => {}
830            _ => panic!("error not raised"),
831        };
832    }
833
834    #[tokio::test]
835    async fn test_request_flow() {
836        let mut mocked_raw_api = MockedRawApi::new();
837
838        let search_req = SearchPublicChats::builder().build();
839        let chats = Chats::builder().chat_ids(vec![1, 2, 3]).build();
840        let chats: serde_json::Value = serde_json::to_value(chats).unwrap();
841        let mut chats_object = chats.as_object().unwrap().clone();
842        chats_object.insert(
843            "@client_id".to_string(),
844            serde_json::Value::Number(1.into()),
845        );
846        chats_object.insert(
847            "@extra".to_string(),
848            serde_json::Value::String(search_req.extra().unwrap().to_string()),
849        );
850        chats_object.insert(
851            "@type".to_string(),
852            serde_json::Value::String("chats".to_string()),
853        );
854        let to_receive = serde_json::to_string(&chats_object).unwrap();
855        mocked_raw_api.set_to_receive(to_receive);
856        log::trace!("chats objects: {:?}", chats_object);
857
858        let mut worker = Worker::builder()
859            .with_tdlib_client(mocked_raw_api.clone())
860            .build()
861            .unwrap();
862        worker.start();
863
864        let client = worker
865            .set_client(
866                Client::builder()
867                    .with_tdlib_client(mocked_raw_api.clone())
868                    .with_tdlib_parameters(TdlibParameters::builder().build())
869                    .build()
870                    .unwrap(),
871            )
872            .await;
873
874        match timeout(
875            Duration::from_secs(10),
876            client.search_public_chats(search_req),
877        )
878        .await
879        {
880            Err(_) => panic!("did not receive response within 1 s"),
881            Ok(Err(e)) => panic!("{}", e),
882            Ok(Ok(result)) => assert_eq!(result.chat_ids(), &vec![1, 2, 3]),
883        }
884    }
885}