1use crate::enums::event::Event;
2use crate::enums::internal_event::InternalEvent;
3use crate::enums::msnp_list::MsnpList;
4use crate::enums::msnp_status::MsnpStatus;
5use crate::errors::contact_error::ContactError;
6use crate::errors::sdk_error::SdkError;
7#[cfg(feature = "uniffi")]
8use crate::event_handler::EventHandler;
9#[cfg(feature = "config")]
10use crate::http::config::Config;
11use crate::http::http_client::HttpClient;
12use crate::models::personal_message::PersonalMessage;
13use crate::models::presence::Presence;
14use crate::models::user_data::UserData;
15use crate::notification_server::commands::{
16 adc, adg, blp, chg, cvr, gcf, gtc, prp, reg, rem, rmg, sbp, syn, usr_i, usr_s, uux, ver, xfr,
17};
18use crate::notification_server::event_matcher::{into_event, into_internal_event};
19use crate::receive_split::receive_split;
20use crate::switchboard_server::switchboard::Switchboard;
21use base64::{Engine as _, engine::general_purpose::STANDARD};
22use core::str;
23use log::{error, trace};
24use std::sync::Arc;
25use std::sync::atomic::AtomicU32;
26use std::time::Duration;
27use tokio::io::AsyncWriteExt;
28use tokio::net::{TcpStream, lookup_host};
29use tokio::sync::{RwLock, broadcast, mpsc};
30use tokio_util::sync::CancellationToken;
31
32pub struct Client {
34 event_tx: async_channel::Sender<Event>,
35 event_rx: async_channel::Receiver<Event>,
36 ns_tx: mpsc::Sender<Vec<u8>>,
37 internal_tx: broadcast::Sender<InternalEvent>,
38 tr_id: AtomicU32,
39 user_data: Arc<RwLock<UserData>>,
40 http_client: HttpClient,
41 cancellation_token: CancellationToken,
42}
43
44impl Client {
45 pub async fn new(server: &str, port: u16) -> Result<Self, SdkError> {
47 let mut server_ips = lookup_host((server, port))
48 .await
49 .or(Err(SdkError::ResolutionError))?;
50
51 let server_ip = server_ips
52 .find(|ip| ip.is_ipv4())
53 .ok_or(SdkError::ResolutionError)?
54 .ip();
55
56 let (event_tx, event_rx) = async_channel::unbounded();
57 let (ns_tx, mut ns_rx) = mpsc::channel::<Vec<u8>>(256);
58 let (internal_tx, _) = broadcast::channel::<InternalEvent>(256);
59
60 let socket = TcpStream::connect((server_ip, port))
61 .await
62 .or(Err(SdkError::ServerError))?;
63
64 let (mut rd, mut wr) = socket.into_split();
65 let task_internal_tx = internal_tx.clone();
66 let task_event_tx = event_tx.clone();
67
68 let cancellation_token = CancellationToken::new();
69 let task_cancellation_token = cancellation_token.clone();
70
71 tokio::spawn(async move {
72 'outer: while let Ok(messages) =
73 receive_split(&mut rd, task_cancellation_token.clone()).await
74 {
75 for message in messages {
76 let internal_event = into_internal_event(&message);
77 if let Err(error) = task_internal_tx.send(internal_event) {
78 error!("{error}");
79 }
80
81 let event = into_event(&message);
82 if let Some(event) = event {
83 let disconnected =
84 matches!(event, Event::Disconnected | Event::LoggedInAnotherDevice);
85
86 if let Err(error) = task_event_tx.send(event).await {
87 error!("{error}");
88 break 'outer;
89 }
90
91 if disconnected {
92 task_event_tx.close();
93 break 'outer;
94 }
95 }
96 }
97 }
98
99 if let Err(error) = task_event_tx.send(Event::Disconnected).await {
100 error!("{error}");
101 }
102
103 task_event_tx.close();
104 task_cancellation_token.cancel();
105 });
106
107 let task_event_tx = event_tx.clone();
108 let task_cancellation_token = cancellation_token.clone();
109
110 tokio::spawn(async move {
111 loop {
112 tokio::select! {
113 message = ns_rx.recv() => {
114 if let Some(message) = message {
115 if let Err(error) = wr.write_all(&message).await {
116 error!("{error}")
117 }
118 } else {
119 break;
120 }
121 }
122
123 _ = task_cancellation_token.cancelled() => {
124 break;
125 }
126 }
127 }
128
129 if let Err(error) = task_event_tx.send(Event::Disconnected).await {
130 error!("{error}");
131 }
132
133 task_event_tx.close();
134 task_cancellation_token.cancel();
135 });
136
137 Ok(Self {
138 event_tx,
139 event_rx,
140 ns_tx,
141 internal_tx,
142 tr_id: AtomicU32::new(0),
143 user_data: Arc::new(RwLock::new(UserData::new())),
144 http_client: HttpClient::new(),
145 cancellation_token,
146 })
147 }
148
149 fn start_pinging(&self) {
150 let event_tx = self.event_tx.clone();
151 let ns_tx = self.ns_tx.clone();
152 let mut internal_rx = self.internal_tx.subscribe();
153 let task_cancellation_token = self.cancellation_token.clone();
154
155 tokio::spawn(async move {
156 let command = "PNG\r\n";
157 'outer: while ns_tx.send(command.as_bytes().to_vec()).await.is_ok() {
158 trace!("C: {command}");
159 loop {
160 tokio::select! {
161 reply = internal_rx.recv() => {
162 if let Ok(InternalEvent::ServerReply(reply)) = reply {
163 trace!("S: {reply}");
164
165 let mut args = reply.split_ascii_whitespace();
166 if args.next().unwrap_or("") == "QNG" {
167 if let Ok(duration) = args.next().unwrap_or("").parse()
169 && duration > 5
170 {
171 tokio::time::sleep(Duration::from_secs(duration)).await;
172 break;
173 } else {
174 break 'outer;
175 }
176 }
177 }
178 }
179
180 _ = task_cancellation_token.cancelled() => {
181 break 'outer;
182 }
183 }
184 }
185 }
186
187 if let Err(error) = event_tx.send(Event::Disconnected).await {
188 error!("{error}");
189 }
190
191 event_tx.close();
192 task_cancellation_token.cancel();
193 });
194 }
195
196 fn handle_switchboard_invitations(&self) {
197 let event_tx = self.event_tx.clone();
198 let mut internal_rx = self.internal_tx.subscribe();
199 let user_data = self.user_data.clone();
200 let task_cancellation_token = self.cancellation_token.clone();
201
202 tokio::spawn(async move {
203 loop {
204 tokio::select! {
205 event = internal_rx.recv() => {
206 if let Ok(event) = event && let InternalEvent::SwitchboardInvitation {
207 server,
208 port,
209 session_id,
210 cki_string,
211 } = event
212 {
213 let switchboard = Switchboard::new(
214 server.as_str(),
215 port.as_str(),
216 cki_string.as_str(),
217 user_data.clone(),
218 )
219 .await;
220
221 if let Ok(switchboard) = switchboard {
222 let user_data = user_data.read().await;
223 if let Some(ref user_email) = user_data.email
224 && switchboard.answer(user_email, &session_id).await.is_ok()
225 && let Err(error) = event_tx
226 .send(Event::SessionAnswered(Arc::new(switchboard)))
227 .await
228 {
229 error!("{error}");
230 break;
231 }
232 }
233 }
234 }
235
236 _ = task_cancellation_token.cancelled() => {
237 break;
238 }
239 }
240 }
241 });
242 }
243
244 pub fn add_event_handler_closure<F, R>(&self, f: F)
247 where
248 F: Fn(Event) -> R + Send + 'static,
249 R: Future<Output = ()> + Send,
250 {
251 let event_rx = self.event_rx.clone();
252 tokio::spawn(async move {
253 while let Ok(event) = event_rx.recv().await {
254 f(event).await;
255 }
256 });
257 }
258
259 #[cfg(feature = "uniffi")]
260 pub fn add_event_handler(&self, handler: Arc<dyn EventHandler>) {
265 let event_rx = self.event_rx.clone();
266 tokio::spawn(async move {
267 while let Ok(event) = event_rx.recv().await {
268 handler.handle(event).await;
269 }
270 });
271 }
272
273 pub async fn login(
280 &self,
281 email: String,
282 password: &str,
283 nexus_url: &str,
284 client_name: &str,
285 version: &str,
286 ) -> Result<Event, SdkError> {
287 let mut internal_rx = self.internal_tx.subscribe();
288
289 ver::send(&self.tr_id, &self.ns_tx, &mut internal_rx).await?;
290 cvr::send(
291 &self.tr_id,
292 &self.ns_tx,
293 &mut internal_rx,
294 &email,
295 client_name,
296 version,
297 )
298 .await?;
299
300 let authorization_string =
301 match usr_i::send(&self.tr_id, &self.ns_tx, &mut internal_rx, &email).await? {
302 InternalEvent::GotAuthorizationString(authorization_string) => authorization_string,
303 InternalEvent::RedirectedTo { server, port } => {
304 return Ok(Event::RedirectedTo { server, port });
305 }
306
307 _ => return Err(SdkError::CouldNotGetAuthenticationString),
308 };
309
310 let token = self
311 .http_client
312 .get_passport_token(&email, password, nexus_url, &authorization_string)
313 .await?;
314
315 usr_s::send(&self.tr_id, &self.ns_tx, &mut internal_rx, &token).await?;
316
317 {
318 let mut user_data = self.user_data.write().await;
319 user_data.email = Some(email);
320 }
321
322 syn::send(&self.tr_id, &self.ns_tx, &mut internal_rx).await?;
323 gcf::send(&self.tr_id, &self.ns_tx, &mut internal_rx).await?;
324
325 self.handle_switchboard_invitations();
326 self.start_pinging();
327
328 Ok(Event::Authenticated)
329 }
330
331 #[cfg(feature = "config")]
332 pub async fn get_config(&self, config_url: &str) -> Result<Config, SdkError> {
334 self.http_client
335 .get_config(config_url)
336 .await
337 .or(Err(SdkError::ConfigRequestError))
338 }
339
340 pub async fn set_presence(&self, presence: MsnpStatus) -> Result<(), SdkError> {
342 let mut internal_rx = self.internal_tx.subscribe();
343 let presence = Presence::new_without_object(presence);
344 let user_data = self.user_data.read().await;
345
346 chg::send(
347 &self.tr_id,
348 &self.ns_tx,
349 &mut internal_rx,
350 &presence,
351 user_data.msn_object.as_deref(),
352 )
353 .await
354 }
355
356 pub async fn set_personal_message(
358 &self,
359 personal_message: &PersonalMessage,
360 ) -> Result<(), SdkError> {
361 let mut internal_rx = self.internal_tx.subscribe();
362 uux::send(&self.tr_id, &self.ns_tx, &mut internal_rx, personal_message).await
363 }
364
365 pub async fn set_display_name(&self, display_name: &str) -> Result<(), SdkError> {
367 let mut internal_rx = self.internal_tx.subscribe();
368 prp::send(&self.tr_id, &self.ns_tx, &mut internal_rx, display_name).await
369 }
370
371 pub async fn set_contact_display_name(
373 &self,
374 guid: &str,
375 display_name: &str,
376 ) -> Result<(), ContactError> {
377 let mut internal_rx = self.internal_tx.subscribe();
378 sbp::send(
379 &self.tr_id,
380 &self.ns_tx,
381 &mut internal_rx,
382 guid,
383 display_name,
384 )
385 .await
386 }
387
388 pub async fn add_contact(
390 &self,
391 email: &str,
392 display_name: &str,
393 list: MsnpList,
394 ) -> Result<Event, ContactError> {
395 let mut internal_rx = self.internal_tx.subscribe();
396 adc::send(
397 &self.tr_id,
398 &self.ns_tx,
399 &mut internal_rx,
400 email,
401 display_name,
402 list,
403 )
404 .await
405 }
406
407 pub async fn remove_contact(&self, email: &str, list: MsnpList) -> Result<(), ContactError> {
410 let mut internal_rx = self.internal_tx.subscribe();
411 rem::send(&self.tr_id, &self.ns_tx, &mut internal_rx, email, list).await
412 }
413
414 pub async fn remove_contact_from_forward_list(&self, guid: &str) -> Result<(), ContactError> {
416 let mut internal_rx = self.internal_tx.subscribe();
417 rem::send_with_forward_list(&self.tr_id, &self.ns_tx, &mut internal_rx, guid).await
418 }
419
420 pub async fn block_contact(&self, email: &str) -> Result<(), ContactError> {
422 let mut internal_rx = self.internal_tx.subscribe();
423 adc::send(
424 &self.tr_id,
425 &self.ns_tx,
426 &mut internal_rx,
427 email,
428 email,
429 MsnpList::BlockList,
430 )
431 .await?;
432
433 rem::send(
434 &self.tr_id,
435 &self.ns_tx,
436 &mut internal_rx,
437 email,
438 MsnpList::AllowList,
439 )
440 .await
441 }
442
443 pub async fn unblock_contact(&self, email: &str) -> Result<(), ContactError> {
445 let mut internal_rx = self.internal_tx.subscribe();
446 adc::send(
447 &self.tr_id,
448 &self.ns_tx,
449 &mut internal_rx,
450 email,
451 email,
452 MsnpList::AllowList,
453 )
454 .await?;
455
456 rem::send(
457 &self.tr_id,
458 &self.ns_tx,
459 &mut internal_rx,
460 email,
461 MsnpList::BlockList,
462 )
463 .await
464 }
465
466 pub async fn create_group(&self, name: &str) -> Result<(), ContactError> {
468 let mut internal_rx = self.internal_tx.subscribe();
469 adg::send(&self.tr_id, &self.ns_tx, &mut internal_rx, name).await
470 }
471
472 pub async fn delete_group(&self, guid: &str) -> Result<(), ContactError> {
474 let mut internal_rx = self.internal_tx.subscribe();
475 rmg::send(&self.tr_id, &self.ns_tx, &mut internal_rx, guid).await
476 }
477
478 pub async fn rename_group(&self, guid: &str, new_name: &str) -> Result<(), ContactError> {
480 let mut internal_rx = self.internal_tx.subscribe();
481 reg::send(&self.tr_id, &self.ns_tx, &mut internal_rx, guid, new_name).await
482 }
483
484 pub async fn add_contact_to_group(
486 &self,
487 guid: &str,
488 group_guid: &str,
489 ) -> Result<(), ContactError> {
490 let mut internal_rx = self.internal_tx.subscribe();
491 adc::send_with_group(&self.tr_id, &self.ns_tx, &mut internal_rx, guid, group_guid).await
492 }
493
494 pub async fn remove_contact_from_group(
496 &self,
497 guid: &str,
498 group_guid: &str,
499 ) -> Result<(), ContactError> {
500 let mut internal_rx = self.internal_tx.subscribe();
501 rem::send_with_group(&self.tr_id, &self.ns_tx, &mut internal_rx, guid, group_guid).await
502 }
503
504 pub async fn set_gtc(&self, gtc: &str) -> Result<(), SdkError> {
506 let mut internal_rx = self.internal_tx.subscribe();
507 gtc::send(&self.tr_id, &self.ns_tx, &mut internal_rx, gtc).await
508 }
509
510 pub async fn set_blp(&self, blp: &str) -> Result<(), SdkError> {
512 let mut internal_rx = self.internal_tx.subscribe();
513 blp::send(&self.tr_id, &self.ns_tx, &mut internal_rx, blp).await
514 }
515
516 pub async fn create_session(&self, email: &str) -> Result<Switchboard, SdkError> {
518 let mut internal_rx = self.internal_tx.subscribe();
519 let switchboard = xfr::send(
520 &self.tr_id,
521 &self.ns_tx,
522 &mut internal_rx,
523 self.user_data.clone(),
524 )
525 .await?;
526
527 let user_data = self.user_data.read().await;
528 let user_email = user_data.email.as_ref().ok_or(SdkError::NotLoggedIn)?;
529
530 switchboard.login(user_email).await?;
531 switchboard.invite(email).await?;
532 Ok(switchboard)
533 }
534
535 pub async fn set_display_picture(&self, display_picture: Vec<u8>) -> Result<String, SdkError> {
538 let mut user_data = self.user_data.write().await;
539 let user_email = user_data.email.as_ref().ok_or(SdkError::NotLoggedIn)?;
540
541 let mut hash = sha1_smol::Sha1::new();
542 hash.update(display_picture.as_slice());
543 let sha1d = STANDARD.encode(hash.digest().bytes());
544
545 let sha1c = format!(
546 "Creator{user_email}Size{}Type3LocationPIC.tmpFriendlyAAA=SHA1D{sha1d}",
547 display_picture.len()
548 );
549
550 let mut hash = sha1_smol::Sha1::new();
551 hash.update(sha1c.as_bytes());
552 let sha1c = STANDARD.encode(hash.digest().bytes());
553
554 user_data.msn_object = Some(format!(
555 "<msnobj Creator=\"{user_email}\" Size=\"{}\" Type=\"3\" Location=\"PIC.tmp\" Friendly=\"AAA=\" SHA1D=\"{sha1d}\" SHA1C=\"{sha1c}\"/>",
556 display_picture.len()
557 ));
558
559 user_data.display_picture = Some(display_picture);
560 Ok(sha1d)
561 }
562
563 pub async fn disconnect(&self) -> Result<(), SdkError> {
565 let command = "OUT\r\n";
566 trace!("C: {command}");
567
568 self.ns_tx
569 .send(command.as_bytes().to_vec())
570 .await
571 .or(Err(SdkError::TransmittingError))?;
572
573 self.event_tx.close();
574 self.cancellation_token.cancel();
575 Ok(())
576 }
577}