1use super::{
3 auth_handler::{AuthStateHandler, ConsoleAuthStateHandler},
4 observer::OBSERVER,
5 tdlib_client::{TdJson, TdLibClient},
6 {Client, ClientState},
7};
8use crate::client::{ClientIdentifier, CLIENT_NOT_AUTHORIZED};
9use crate::types::{CheckAuthenticationBotToken, GetAuthorizationState, JsonValue};
10use crate::{
11 errors::{Error, Result},
12 tdjson::ClientId,
13 types::{
14 AuthorizationState, CheckAuthenticationCode, CheckAuthenticationPassword,
15 CheckDatabaseEncryptionKey, GetApplicationConfig, RObject, RegisterUser,
16 SetAuthenticationPhoneNumber, SetTdlibParameters, Update, UpdateAuthorizationState,
17 },
18};
19use std::collections::HashMap;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22use std::time::Duration;
23use tokio::sync::Mutex;
24use tokio::{
25 sync::{mpsc, RwLock},
26 task::JoinHandle,
27 time,
28};
29
30#[derive(Debug)]
31pub struct WorkerBuilder<A, T>
32where
33 A: AuthStateHandler + Send + Sync + 'static,
34 T: TdLibClient + Send + Sync + Clone + 'static,
35{
36 read_updates_timeout: f64,
37 channels_send_timeout: f64,
38 auth_state_handler: A,
39 tdlib_client: T,
40}
41
42impl Default for WorkerBuilder<ConsoleAuthStateHandler, TdJson> {
43 fn default() -> Self {
45 Self {
46 read_updates_timeout: 1.0,
47 channels_send_timeout: 5.0,
48 auth_state_handler: ConsoleAuthStateHandler::new(),
49 tdlib_client: TdJson::new(),
50 }
51 }
52}
53
54impl<A, T> WorkerBuilder<A, T>
55where
56 A: AuthStateHandler + Send + Sync + 'static,
57 T: TdLibClient + Send + Sync + Clone + 'static,
58{
59 pub fn with_channels_send_timeout(mut self, timeout: f64) -> Self {
61 self.channels_send_timeout = timeout;
62 self
63 }
64
65 pub fn with_read_updates_timeout(mut self, read_updates_timeout: f64) -> Self {
66 self.read_updates_timeout = read_updates_timeout;
67 self
68 }
69
70 pub fn with_auth_state_handler<N>(self, auth_state_handler: N) -> WorkerBuilder<N, T>
73 where
74 N: AuthStateHandler + Send + Sync + 'static,
75 {
76 WorkerBuilder {
77 auth_state_handler,
78 read_updates_timeout: self.read_updates_timeout,
79 channels_send_timeout: self.channels_send_timeout,
80 tdlib_client: self.tdlib_client,
81 }
82 }
83
84 #[doc(hidden)]
85 pub fn with_tdlib_client<C>(self, tdlib_client: C) -> WorkerBuilder<A, C>
86 where
87 C: TdLibClient + Send + Sync + Clone + 'static,
88 {
89 WorkerBuilder {
90 tdlib_client,
91 auth_state_handler: self.auth_state_handler,
92 read_updates_timeout: self.read_updates_timeout,
93 channels_send_timeout: self.channels_send_timeout,
94 }
95 }
96
97 pub fn build(self) -> Result<Worker<A, T>> {
98 let worker = Worker::new(
99 self.auth_state_handler,
100 self.read_updates_timeout,
101 self.channels_send_timeout,
102 self.tdlib_client,
103 );
104 Ok(worker)
105 }
106}
107
108pub(crate) type StateMessage = Result<ClientState, (Error, UpdateAuthorizationState)>;
109
110#[derive(Debug, Clone)]
111struct ClientContext<S: TdLibClient + Clone> {
112 client: Client<S>,
113 private_state_message_sender: mpsc::Sender<ClientState>,
114 private_state_message_receiver: Arc<Mutex<mpsc::Receiver<ClientState>>>,
115 pub_state_message_sender: Option<mpsc::Sender<StateMessage>>,
116 pub_state_message_receiver: Option<Arc<Mutex<mpsc::Receiver<StateMessage>>>>,
117}
118
119impl<S> ClientContext<S>
120where
121 S: TdLibClient + Clone,
122{
123 pub fn client(&self) -> &Client<S> {
124 &self.client
125 }
126 pub fn private_state_message_receiver(&self) -> &Arc<Mutex<mpsc::Receiver<ClientState>>> {
127 &self.private_state_message_receiver
128 }
129 pub fn private_state_message_sender(&self) -> &mpsc::Sender<ClientState> {
130 &self.private_state_message_sender
131 }
132 pub fn pub_state_message_sender(&self) -> &Option<mpsc::Sender<StateMessage>> {
133 &self.pub_state_message_sender
134 }
135 pub fn pub_state_message_receiver(&self) -> &Option<Arc<Mutex<mpsc::Receiver<StateMessage>>>> {
136 &self.pub_state_message_receiver
137 }
138}
139
140type ClientsMap<S> = HashMap<ClientId, ClientContext<S>>;
141
142#[derive(Debug, Clone)]
145pub struct Worker<A, S>
146where
147 A: AuthStateHandler + Send + Sync + 'static,
148 S: TdLibClient + Send + Sync + Clone + 'static,
149{
150 run_flag: Arc<AtomicBool>,
151 auth_state_handler: Arc<A>,
152 read_updates_timeout: Duration,
153 channels_send_timeout: Duration,
154 tdlib_client: S,
155 clients: Arc<RwLock<ClientsMap<S>>>,
156}
157
158impl Worker<ConsoleAuthStateHandler, TdJson> {
159 pub fn builder() -> WorkerBuilder<ConsoleAuthStateHandler, TdJson> {
160 WorkerBuilder::default()
161 }
162}
163
164impl<A, T> Worker<A, T>
165where
166 A: AuthStateHandler + Send + Sync + 'static,
167 T: TdLibClient + Send + Sync + Clone + 'static,
168{
169 pub async fn get_client_state(
171 &self,
172 client: &Client<T>,
173 ) -> Result<(ClientState, AuthorizationState)> {
174 let state = client
175 .get_authorization_state(GetAuthorizationState::builder().build())
176 .await?;
177 match &state {
178 AuthorizationState::_Default => {
179 panic!()
180 }
181 AuthorizationState::Closed(_) => Ok((ClientState::Closed, state)),
182 AuthorizationState::Closing(_) => Ok((ClientState::Closed, state)),
183 AuthorizationState::LoggingOut(_) => Ok((ClientState::Closed, state)),
184 AuthorizationState::Ready(_) => Ok((ClientState::Opened, state)),
185 AuthorizationState::WaitCode(_) => Ok((ClientState::Authorizing, state)),
186 AuthorizationState::WaitEncryptionKey(_) => Ok((ClientState::Authorizing, state)),
187 AuthorizationState::WaitOtherDeviceConfirmation(_) => {
188 Ok((ClientState::Authorizing, state))
189 }
190 AuthorizationState::WaitPassword(_) => Ok((ClientState::Authorizing, state)),
191 AuthorizationState::WaitPhoneNumber(_) => Ok((ClientState::Authorizing, state)),
192 AuthorizationState::WaitRegistration(_) => Ok((ClientState::Authorizing, state)),
193 AuthorizationState::WaitTdlibParameters(_) => Ok((ClientState::Authorizing, state)),
194 AuthorizationState::GetAuthorizationState(_) => {
195 panic!()
196 }
197 }
198 }
199
200 pub async fn reset_auth(&mut self, client: &mut Client<T>) -> Result<()> {
203 client.stop().await?;
204 let client_id = client.take_client_id()?;
205 self.clients.write().await.remove(&client_id);
206 Ok(())
207 }
208
209 pub async fn wait_auth_state_change(&self, client: &Client<T>) -> Result<StateMessage> {
213 let client_id = client.get_client_id().ok_or(CLIENT_NOT_AUTHORIZED)?;
214 match self.clients.read().await.get(&client_id) {
215 None => {Err(Error::BadRequest("client not authorized yet"))}
216 Some(v) => {
217 match v.pub_state_message_receiver() {
218 None => {Err(Error::BadRequest("state receiver not specified, need to call `Client::builder().with_auth_state_channel(...) before Worker::bind_client(...)"))}
219 Some(rec) => {
220 let state = rec.lock().await.recv().await.ok_or(Error::Internal("can't receive state: channel closed"))?;
221 Ok(state)
222 }
223 }
224 }
225 }
226 }
227
228 pub async fn wait_client_state(&self, client: &Client<T>) -> Result<ClientState> {
232 let guard = self.clients.read().await;
233 match guard.get(&client.get_client_id().ok_or(CLIENT_NOT_AUTHORIZED)?) {
234 None => Err(Error::BadRequest("client not bound yet")),
235 Some(ctx) => {
236 let mut rec = ctx.private_state_message_receiver().lock().await;
237 Ok(rec.recv().await.unwrap())
238 }
239 }
240 }
241
242 pub async fn bind_client(&mut self, mut client: Client<T>) -> Result<Client<T>> {
245 if !self.is_running() {
246 return Err(Error::BadRequest("worker not started yet"));
247 };
248 let client_id = client.get_tdlib_client().new_client();
249 log::debug!("new client created: {}", client_id);
250 client.set_client_id(client_id)?;
251 self.store_client_context(&client).await?;
252
253 Ok(client)
254 }
255
256 pub async fn reload_client(&mut self, mut client: Client<T>) -> Result<Client<T>> {
257 if !self.is_running() {
258 return Err(Error::BadRequest("worker not started yet"));
259 };
260 let client_id = client.get_tdlib_client().new_client();
261 log::debug!("new client created: {}", client_id);
262 let old_client_id = client.reload(client_id).await?;
263 self.store_client_context(&client).await?;
264 self.clients.write().await.remove(&old_client_id);
265
266 Ok(client)
267 }
268
269 async fn store_client_context(&self, client: &Client<T>) -> Result<()> {
270 let (sx, rx) = match client.get_auth_state_channel_size() {
271 None => (None, None),
272 Some(size) => {
273 let (sx, rx) = mpsc::channel(size);
274 (Some(sx), Some(Arc::new(Mutex::new(rx))))
275 }
276 };
277
278 let (psx, prx) = mpsc::channel::<ClientState>(5);
279 let ctx = ClientContext {
280 client: client.clone(),
281 pub_state_message_sender: sx,
282 pub_state_message_receiver: rx,
283 private_state_message_receiver: Arc::new(Mutex::new(prx)),
284 private_state_message_sender: psx,
285 };
286
287 let client_id = client.get_client_id().ok_or(CLIENT_NOT_AUTHORIZED)?;
288
289 self.clients.write().await.insert(client_id, ctx);
290 log::debug!("new client added");
291
292 first_internal_request(&client.get_tdlib_client(), client_id).await;
295
296 log::trace!("received first internal response");
297
298 Ok(())
299 }
300
301 pub fn is_running(&self) -> bool {
303 self.run_flag.load(Ordering::Acquire)
304 }
305
306 #[cfg(test)]
307 pub async fn set_client(&mut self, mut client: Client<T>) -> Client<T> {
309 let client_id = client.get_tdlib_client().new_client();
310 log::debug!("new client created: {}", client_id);
311 client.set_client_id(client_id).unwrap();
312
313 let (psx, prx) = mpsc::channel::<ClientState>(5);
314 let ctx = ClientContext {
315 client: client.clone(),
316 pub_state_message_sender: None,
317 pub_state_message_receiver: None,
318 private_state_message_receiver: Arc::new(Mutex::new(prx)),
319 private_state_message_sender: psx,
320 };
321
322 self.clients.write().await.insert(client_id, ctx);
323 client
324 }
325
326 pub(crate) fn new(
328 auth_state_handler: A,
329 read_updates_timeout: f64,
330 channels_send_timeout: f64,
331 tdlib_client: T,
332 ) -> Self {
333 let run_flag = Arc::new(AtomicBool::new(false));
334 let clients: ClientsMap<T> = HashMap::new();
335
336 Self {
337 run_flag,
338 tdlib_client,
339 read_updates_timeout: time::Duration::from_secs_f64(read_updates_timeout),
340 channels_send_timeout: time::Duration::from_secs_f64(channels_send_timeout),
341 auth_state_handler: Arc::new(auth_state_handler),
342 clients: Arc::new(RwLock::new(clients)),
343 }
344 }
345
346 pub fn start(&mut self) -> JoinHandle<()> {
349 let (auth_sx, auth_rx) = mpsc::channel::<UpdateAuthorizationState>(20);
350
351 self.run_flag.store(true, Ordering::Release);
352 let updates_handle = self.init_updates_task(auth_sx);
353 let auth_handle = self.init_auth_task(auth_rx);
354
355 let run_flag = self.run_flag.clone();
356
357 tokio::spawn(async move {
358 tokio::select! {
359 _ = auth_handle => {
360 log::debug!("authorization task stopped");
361 },
362 _ = updates_handle => {
363 log::debug!("updates task stopped");
364 },
365 };
366 run_flag.store(false, Ordering::Release);
367 })
368 }
369
370 pub fn stop(&self) {
373 self.run_flag.store(false, Ordering::Release);
374 }
375
376 fn init_updates_task(&self, auth_sx: mpsc::Sender<UpdateAuthorizationState>) -> JoinHandle<()> {
378 let run_flag = self.run_flag.clone();
379 let clients = self.clients.clone();
380 let recv_timeout = self.read_updates_timeout;
381 let send_timeout = self.channels_send_timeout;
382 let tdlib_client = Arc::new(self.tdlib_client.clone());
383
384 tokio::spawn(async move {
385 let current = tokio::runtime::Handle::try_current().unwrap();
386 while run_flag.load(Ordering::Acquire) {
387 let cl = tdlib_client.clone();
388 if let Some(json) = current
389 .spawn_blocking(move || cl.receive(recv_timeout.as_secs_f64()))
390 .await
391 .unwrap()
392 {
393 log::trace!("received json from tdlib: {}", json);
394 handle_td_resp_received(json.as_str(), &auth_sx, &clients, send_timeout).await;
395 }
396 }
397 })
398 }
399
400 pub async fn handle_auth_state(
401 &self,
402 auth_state: &AuthorizationState,
403 client: &Client<T>,
404 ) -> Result<()> {
405 let clients_guard = self.clients.read().await;
406 match clients_guard.get(&client.get_client_id().ok_or(CLIENT_NOT_AUTHORIZED)?) {
407 None => Err(Error::BadRequest("client not bound yet")),
408 Some(ctx) => {
409 handle_auth_state(
410 client,
411 ctx.pub_state_message_sender(),
412 ctx.private_state_message_sender(),
413 self.auth_state_handler.as_ref(),
414 auth_state,
415 self.channels_send_timeout,
416 )
417 .await
418 }
419 }
420 }
421
422 fn init_auth_task(
424 &self,
425 mut auth_rx: mpsc::Receiver<UpdateAuthorizationState>,
426 ) -> JoinHandle<()> {
427 let auth_state_handler = self.auth_state_handler.clone();
428 let clients = self.clients.clone();
429 let send_timeout = self.channels_send_timeout;
430
431 tokio::spawn(async move {
432 while let Some(auth_state) = auth_rx.recv().await {
433 log::debug!("received new auth state: {:?}", auth_state);
434 if let Some(client_id) = auth_state.client_id() {
435 let result = match clients.read().await.get(&client_id) {
436 None => {
437 log::warn!("found auth updates for unavailable client ({})", client_id);
438 continue;
439 }
440 Some(client_ctx) => {
441 handle_auth_state(
442 client_ctx.client(),
443 client_ctx.pub_state_message_sender(),
444 client_ctx.private_state_message_sender(),
445 auth_state_handler.as_ref(),
446 auth_state.authorization_state(),
447 send_timeout,
448 )
449 .await
450 }
451 };
452
453 match result {
454 Ok(_) => {
455 log::debug!("state changes handled properly")
456 }
457 Err(err) => {
458 match clients.read().await.get(&client_id) {
459 None => {
460 log::error!("client not found")
461 }
462 Some(cl) => match cl.pub_state_message_sender() {
463 Some(state_sender) => {
464 if let Err(err) = state_sender
465 .send_timeout(Err((err, auth_state)), send_timeout)
466 .await
467 {
468 log::error!("cannot send client state changes: {}", err)
469 }
470 }
471 None => {
472 log::warn!("error received and possibly cannot be handled because of empty state receiver for client {client_id}: {err}")
473 }
474 },
475 };
476 }
477 }
478 }
479 }
480 })
481 }
482}
483
484async fn handle_td_resp_received<S: TdLibClient + Send + Sync + Clone>(
485 response: &str,
486 auth_sx: &mpsc::Sender<UpdateAuthorizationState>,
487 clients: &RwLock<ClientsMap<S>>,
488 send_timeout: Duration,
489) {
490 match serde_json::from_str::<serde_json::Value>(response) {
491 Err(e) => log::error!("can't deserialize tdlib data: {}", e),
492 Ok(t) => {
493 if let Some(t) = OBSERVER.notify(t) {
494 match serde_json::from_value::<Update>(t) {
495 Err(err) => {
496 log::error!("cannot deserialize to update: {err:?}, data: {response:?}")
497 }
498 Ok(update) => {
499 if let Update::AuthorizationState(auth_state) = update {
500 log::trace!("auth state send: {:?}", auth_state);
501 match auth_sx.send_timeout(auth_state, send_timeout).await {
502 Ok(_) => {
503 log::trace!("auth state sent");
504 }
505 Err(err) => {
506 log::error!("can't send auth state update: {}", err)
507 }
508 };
509 } else if let Some(client_id) = update.client_id() {
510 match clients.read().await.get(&client_id) {
511 None => {
512 log::warn!(
513 "found updates for unavailable client ({})",
514 client_id
515 )
516 }
517 Some(ctx) => {
518 if let Some(sender) = ctx.client().updates_sender() {
519 log::trace!("sending update to client");
520 match sender
521 .send_timeout(Box::new(update), send_timeout)
522 .await
523 {
524 Ok(_) => {
525 log::trace!("update sent");
526 }
527 Err(err) => {
528 log::error!("can't send update: {}", err)
529 }
530 };
531 }
532 }
533 }
534 }
535 }
536 }
537 }
538 }
539 }
540}
541
542impl<A, S> Drop for Worker<A, S>
543where
544 A: AuthStateHandler + Send + Sync + 'static,
545 S: TdLibClient + Send + Sync + Clone + 'static,
546{
547 fn drop(&mut self) {
548 self.stop();
549 }
550}
551
552async fn handle_auth_state<A, R>(
553 client: &Client<R>,
554 pub_state_sender: &Option<mpsc::Sender<StateMessage>>,
555 private_state_sender: &mpsc::Sender<ClientState>,
556 auth_state_handler: &A,
557 state: &AuthorizationState,
558 send_state_timeout: Duration,
559) -> Result<()>
560where
561 A: AuthStateHandler + Sync,
562 R: TdLibClient + Clone,
563{
564 log::debug!("handling new auth state: {:?}", state);
565 let mut result_state = None;
566 let res = match state {
567 AuthorizationState::_Default => Ok(()),
568 AuthorizationState::Closing(_) => Ok(()),
569 AuthorizationState::LoggingOut(_) => Ok(()),
570 AuthorizationState::Closed(_) => {
571 result_state = Some(ClientState::Closed);
572 Ok(())
573 }
574 AuthorizationState::Ready(_) => {
575 log::debug!("ready state received, send signal");
576 result_state = Some(ClientState::Opened);
577 Ok(())
578 }
579 AuthorizationState::WaitCode(wait_code) => {
580 let code = auth_state_handler
581 .handle_wait_code(client.get_auth_handler(), wait_code)
582 .await;
583 client
584 .check_authentication_code(CheckAuthenticationCode::builder().code(code).build())
585 .await?;
586 Ok(())
587 }
588 AuthorizationState::WaitEncryptionKey(wait_encryption_key) => {
589 let key = auth_state_handler
590 .handle_encryption_key(client.get_auth_handler(), wait_encryption_key)
591 .await;
592 log::debug!("checking encryption key");
593 client
594 .check_database_encryption_key(
595 CheckDatabaseEncryptionKey::builder()
596 .encryption_key(key)
597 .build(),
598 )
599 .await?;
600 log::debug!("encryption key check done");
601 Ok(())
602 }
603 AuthorizationState::WaitOtherDeviceConfirmation(wait_device_confirmation) => {
604 log::debug!("handling other device confirmation");
605 auth_state_handler
606 .handle_other_device_confirmation(
607 client.get_auth_handler(),
608 wait_device_confirmation,
609 )
610 .await;
611 log::debug!("handled other device confirmation");
612 Ok(())
613 }
614 AuthorizationState::WaitPassword(wait_password) => {
615 let password = auth_state_handler
616 .handle_wait_password(client.get_auth_handler(), wait_password)
617 .await;
618 log::debug!("checking password");
619 client
620 .check_authentication_password(
621 CheckAuthenticationPassword::builder()
622 .password(password)
623 .build(),
624 )
625 .await?;
626 log::debug!("password checked");
627 Ok(())
628 }
629 AuthorizationState::WaitPhoneNumber(wait_phone_number) => {
630 let identifier = auth_state_handler
631 .handle_wait_client_identifier(client.get_auth_handler(), wait_phone_number)
632 .await;
633 match identifier {
634 ClientIdentifier::BotToken(token) => {
635 client
636 .check_authentication_bot_token(
637 CheckAuthenticationBotToken::builder().token(token).build(),
638 )
639 .await?;
640 Ok(())
641 }
642 ClientIdentifier::PhoneNumber(phone) => {
643 client
644 .set_authentication_phone_number(
645 SetAuthenticationPhoneNumber::builder()
646 .phone_number(phone)
647 .build(),
648 )
649 .await?;
650 Ok(())
651 }
652 }
653 }
654 AuthorizationState::WaitRegistration(wait_registration) => {
655 log::debug!("handling wait registration");
656 let (first_name, last_name) = auth_state_handler
657 .handle_wait_registration(client.get_auth_handler(), wait_registration)
658 .await;
659 let register = RegisterUser::builder()
660 .first_name(first_name)
661 .last_name(last_name)
662 .build();
663 client.register_user(register).await?;
664 log::debug!("handled register user");
665 Ok(())
666 }
667 AuthorizationState::WaitTdlibParameters(_) => {
668 log::debug!("going to set tdlib parameters");
669 client
670 .set_tdlib_parameters(
671 SetTdlibParameters::builder()
672 .parameters(client.tdlib_parameters())
673 .build(),
674 )
675 .await?;
676 log::debug!("tdlib parameters set");
677 Ok(())
678 }
679 AuthorizationState::GetAuthorizationState(_) => Err(Error::Internal(
680 "retrieved GetAuthorizationState update but observer not found any subscriber",
681 )),
682 };
683
684 match &result_state {
685 None => {}
686 Some(state) => {
687 if let Err(err) = private_state_sender.send(state.clone()).await {
688 {
689 log::error!(
690 "can't send state update, but state changed; error: {:?}, state: {:?}",
691 err,
692 state
693 )
694 };
695 }
696
697 if let Some(sender) = &pub_state_sender {
698 if let Err(err) = sender
699 .send_timeout(Ok(state.clone()), send_state_timeout)
700 .await
701 {
702 log::error!(
703 "can't send state update, but state changed; error: {:?}, state: {:?}",
704 err,
705 state
706 )
707 };
708 }
709 }
710 }
711 res
712}
713
714async fn first_internal_request<S: TdLibClient>(tdlib_client: &S, client_id: ClientId) {
715 let req = GetApplicationConfig::builder().build();
716 let extra = match req.as_ref().extra().ok_or(Error::Internal(
717 "invalid tdlib response type, not have `extra` field",
718 )) {
719 Ok(v) => v,
720 Err(err) => {
721 log::error!("{}", err);
722 return;
723 }
724 };
725 let signal = OBSERVER.subscribe(extra);
726 if let Err(err) = tdlib_client.send(client_id, req.as_ref()) {
727 log::error!("{}", err);
728 return;
729 };
730
731 let received = signal.await;
732 OBSERVER.unsubscribe(extra);
733 match received {
734 Err(_) => log::error!("receiver already closed"),
735 Ok(v) => {
736 log::trace!("first internal response: {v}");
737 if let Err(e) = serde_json::from_value::<JsonValue>(v) {
738 log::warn!("invalid first internal response received: {}", e)
739 }
740 }
741 };
742}
743
744#[cfg(test)]
745mod tests {
746 use crate::client::tdlib_client::TdLibClient;
747 use crate::client::worker::Worker;
748 use crate::client::Client;
749 use crate::errors::Result;
750 use crate::tdjson;
751 use crate::types::{Chats, RFunction, RObject, SearchPublicChats, TdlibParameters};
752 use std::time::Duration;
753 use tokio::time::timeout;
754
755 #[derive(Clone)]
756 struct MockedRawApi {
757 to_receive: Option<String>,
758 }
759
760 impl MockedRawApi {
761 pub fn set_to_receive(&mut self, value: String) {
762 log::trace!("delayed to receive: {}", value);
763 self.to_receive = Some(value);
764 }
765
766 pub fn new() -> Self {
767 Self { to_receive: None }
768 }
769 }
770
771 impl TdLibClient for MockedRawApi {
772 fn send<Fnc: RFunction>(&self, _client_id: tdjson::ClientId, _fnc: Fnc) -> Result<()> {
773 Ok(())
774 }
775
776 fn receive(&self, _timeout: f64) -> Option<String> {
777 self.to_receive.clone()
778 }
779
780 fn execute<Fnc: RFunction>(&self, _fnc: Fnc) -> Result<Option<String>> {
781 unimplemented!()
782 }
783
784 fn new_client(&self) -> tdjson::ClientId {
785 1
786 }
787 }
788
789 #[tokio::test]
790 async fn test_start_and_auth() {
791 let mocked_raw_api = MockedRawApi::new();
792 let mut worker = Worker::builder()
793 .with_tdlib_client(mocked_raw_api.clone())
794 .build()
795 .unwrap();
796 let res = timeout(
797 Duration::from_millis(50),
798 worker.bind_client(
799 Client::builder()
800 .with_tdlib_client(mocked_raw_api.clone())
801 .with_tdlib_parameters(TdlibParameters::builder().build())
802 .build()
803 .unwrap(),
804 ),
805 )
806 .await;
807 match res {
808 Err(e) => panic!("{:?}", e),
809 Ok(v) => match v {
810 Err(e) => assert_eq!(e.to_string(), "worker not started yet".to_string()),
811 Ok(_) => panic!("error not raised"),
812 },
813 };
814
815 worker.start();
816 let res = timeout(
818 Duration::from_millis(50),
819 worker.bind_client(
820 Client::builder()
821 .with_tdlib_client(mocked_raw_api.clone())
822 .with_tdlib_parameters(TdlibParameters::builder().build())
823 .build()
824 .unwrap(),
825 ),
826 )
827 .await;
828 match res {
829 Err(_) => {}
830 _ => panic!("error not raised"),
831 };
832 }
833
834 #[tokio::test]
835 async fn test_request_flow() {
836 let mut mocked_raw_api = MockedRawApi::new();
837
838 let search_req = SearchPublicChats::builder().build();
839 let chats = Chats::builder().chat_ids(vec![1, 2, 3]).build();
840 let chats: serde_json::Value = serde_json::to_value(chats).unwrap();
841 let mut chats_object = chats.as_object().unwrap().clone();
842 chats_object.insert(
843 "@client_id".to_string(),
844 serde_json::Value::Number(1.into()),
845 );
846 chats_object.insert(
847 "@extra".to_string(),
848 serde_json::Value::String(search_req.extra().unwrap().to_string()),
849 );
850 chats_object.insert(
851 "@type".to_string(),
852 serde_json::Value::String("chats".to_string()),
853 );
854 let to_receive = serde_json::to_string(&chats_object).unwrap();
855 mocked_raw_api.set_to_receive(to_receive);
856 log::trace!("chats objects: {:?}", chats_object);
857
858 let mut worker = Worker::builder()
859 .with_tdlib_client(mocked_raw_api.clone())
860 .build()
861 .unwrap();
862 worker.start();
863
864 let client = worker
865 .set_client(
866 Client::builder()
867 .with_tdlib_client(mocked_raw_api.clone())
868 .with_tdlib_parameters(TdlibParameters::builder().build())
869 .build()
870 .unwrap(),
871 )
872 .await;
873
874 match timeout(
875 Duration::from_secs(10),
876 client.search_public_chats(search_req),
877 )
878 .await
879 {
880 Err(_) => panic!("did not receive response within 1 s"),
881 Ok(Err(e)) => panic!("{}", e),
882 Ok(Ok(result)) => assert_eq!(result.chat_ids(), &vec![1, 2, 3]),
883 }
884 }
885}