Skip to main content

msnp11_sdk/
client.rs

1use crate::enums::event::Event;
2use crate::enums::internal_event::InternalEvent;
3use crate::enums::msnp_list::MsnpList;
4use crate::enums::msnp_status::MsnpStatus;
5use crate::errors::contact_error::ContactError;
6use crate::errors::sdk_error::SdkError;
7#[cfg(feature = "uniffi")]
8use crate::event_handler::EventHandler;
9#[cfg(feature = "config")]
10use crate::http::config::Config;
11use crate::http::http_client::HttpClient;
12use crate::models::personal_message::PersonalMessage;
13use crate::models::presence::Presence;
14use crate::models::user_data::UserData;
15use crate::notification_server::commands::{
16    adc, adg, blp, chg, cvr, gcf, gtc, prp, reg, rem, rmg, sbp, syn, usr_i, usr_s, uux, ver, xfr,
17};
18use crate::notification_server::event_matcher::{into_event, into_internal_event};
19use crate::receive_split::receive_split;
20use crate::switchboard_server::switchboard::Switchboard;
21use base64::{Engine as _, engine::general_purpose::STANDARD};
22use core::str;
23use log::{error, trace};
24use std::sync::Arc;
25use std::sync::atomic::AtomicU32;
26use std::time::Duration;
27use tokio::io::AsyncWriteExt;
28use tokio::net::{TcpStream, lookup_host};
29use tokio::sync::{RwLock, broadcast, mpsc};
30use tokio_util::sync::CancellationToken;
31
32/// Defines the client itself, all Notification Server actions are done through an instance of this struct.
33pub struct Client {
34    event_tx: async_channel::Sender<Event>,
35    event_rx: async_channel::Receiver<Event>,
36    ns_tx: mpsc::Sender<Vec<u8>>,
37    internal_tx: broadcast::Sender<InternalEvent>,
38    tr_id: AtomicU32,
39    user_data: Arc<RwLock<UserData>>,
40    http_client: HttpClient,
41    cancellation_token: CancellationToken,
42}
43
44impl Client {
45    /// Connects to the server, defines the channels and returns a new instance.
46    pub async fn new(server: &str, port: u16) -> Result<Self, SdkError> {
47        let mut server_ips = lookup_host((server, port))
48            .await
49            .or(Err(SdkError::ResolutionError))?;
50
51        let server_ip = server_ips
52            .find(|ip| ip.is_ipv4())
53            .ok_or(SdkError::ResolutionError)?
54            .ip();
55
56        let (event_tx, event_rx) = async_channel::unbounded();
57        let (ns_tx, mut ns_rx) = mpsc::channel::<Vec<u8>>(256);
58        let (internal_tx, _) = broadcast::channel::<InternalEvent>(256);
59
60        let socket = TcpStream::connect((server_ip, port))
61            .await
62            .or(Err(SdkError::ServerError))?;
63
64        let (mut rd, mut wr) = socket.into_split();
65        let task_internal_tx = internal_tx.clone();
66        let task_event_tx = event_tx.clone();
67
68        let cancellation_token = CancellationToken::new();
69        let task_cancellation_token = cancellation_token.clone();
70
71        tokio::spawn(async move {
72            'outer: while let Ok(messages) =
73                receive_split(&mut rd, task_cancellation_token.clone()).await
74            {
75                for message in messages {
76                    let internal_event = into_internal_event(&message);
77                    if let Err(error) = task_internal_tx.send(internal_event) {
78                        error!("{error}");
79                    }
80
81                    let event = into_event(&message);
82                    if let Some(event) = event {
83                        let disconnected =
84                            matches!(event, Event::Disconnected | Event::LoggedInAnotherDevice);
85
86                        if let Err(error) = task_event_tx.send(event).await {
87                            error!("{error}");
88                            break 'outer;
89                        }
90
91                        if disconnected {
92                            task_event_tx.close();
93                            break 'outer;
94                        }
95                    }
96                }
97            }
98
99            if let Err(error) = task_event_tx.send(Event::Disconnected).await {
100                error!("{error}");
101            }
102
103            task_event_tx.close();
104            task_cancellation_token.cancel();
105        });
106
107        let task_event_tx = event_tx.clone();
108        let task_cancellation_token = cancellation_token.clone();
109
110        tokio::spawn(async move {
111            loop {
112                tokio::select! {
113                    message = ns_rx.recv() => {
114                        if let Some(message) = message {
115                            if let Err(error) = wr.write_all(&message).await {
116                                error!("{error}")
117                            }
118                        } else {
119                            break;
120                        }
121                    }
122
123                    _ = task_cancellation_token.cancelled() => {
124                        break;
125                    }
126                }
127            }
128
129            if let Err(error) = task_event_tx.send(Event::Disconnected).await {
130                error!("{error}");
131            }
132
133            task_event_tx.close();
134            task_cancellation_token.cancel();
135        });
136
137        Ok(Self {
138            event_tx,
139            event_rx,
140            ns_tx,
141            internal_tx,
142            tr_id: AtomicU32::new(0),
143            user_data: Arc::new(RwLock::new(UserData::new())),
144            http_client: HttpClient::new(),
145            cancellation_token,
146        })
147    }
148
149    fn start_pinging(&self) {
150        let event_tx = self.event_tx.clone();
151        let ns_tx = self.ns_tx.clone();
152        let mut internal_rx = self.internal_tx.subscribe();
153        let task_cancellation_token = self.cancellation_token.clone();
154
155        tokio::spawn(async move {
156            let command = "PNG\r\n";
157            'outer: while ns_tx.send(command.as_bytes().to_vec()).await.is_ok() {
158                trace!("C: {command}");
159                loop {
160                    tokio::select! {
161                        reply = internal_rx.recv() => {
162                            if let Ok(InternalEvent::ServerReply(reply)) = reply {
163                                trace!("S: {reply}");
164
165                                let mut args = reply.split_ascii_whitespace();
166                                if args.next().unwrap_or("") == "QNG" {
167                                    // Parse and sanity check to avoid spamming the server
168                                    if let Ok(duration) = args.next().unwrap_or("").parse()
169                                        && duration > 5
170                                    {
171                                        tokio::time::sleep(Duration::from_secs(duration)).await;
172                                        break;
173                                    } else {
174                                        break 'outer;
175                                    }
176                                }
177                            }
178                        }
179
180                        _ = task_cancellation_token.cancelled() => {
181                            break 'outer;
182                        }
183                    }
184                }
185            }
186
187            if let Err(error) = event_tx.send(Event::Disconnected).await {
188                error!("{error}");
189            }
190
191            event_tx.close();
192            task_cancellation_token.cancel();
193        });
194    }
195
196    fn handle_switchboard_invitations(&self) {
197        let event_tx = self.event_tx.clone();
198        let mut internal_rx = self.internal_tx.subscribe();
199        let user_data = self.user_data.clone();
200        let task_cancellation_token = self.cancellation_token.clone();
201
202        tokio::spawn(async move {
203            loop {
204                tokio::select! {
205                    event = internal_rx.recv() => {
206                            if let Ok(event) = event && let InternalEvent::SwitchboardInvitation {
207                                server,
208                                port,
209                                session_id,
210                                cki_string,
211                            } = event
212                            {
213                                let switchboard = Switchboard::new(
214                                    server.as_str(),
215                                    port.as_str(),
216                                    cki_string.as_str(),
217                                    user_data.clone(),
218                                )
219                                .await;
220
221                                if let Ok(switchboard) = switchboard {
222                                    let user_data = user_data.read().await;
223                                    if let Some(ref user_email) = user_data.email
224                                        && switchboard.answer(user_email, &session_id).await.is_ok()
225                                        && let Err(error) = event_tx
226                                            .send(Event::SessionAnswered(Arc::new(switchboard)))
227                                            .await
228                                    {
229                                        error!("{error}");
230                                        break;
231                                    }
232                                }
233                            }
234                    }
235
236                    _ = task_cancellation_token.cancelled() => {
237                        break;
238                    }
239                }
240            }
241        });
242    }
243
244    /// Adds a handler closure. If you're using this SDK with Rust, not through a foreign language binding, then this is the preferred
245    /// method of receiving and handling events.
246    pub fn add_event_handler_closure<F, R>(&self, f: F)
247    where
248        F: Fn(Event) -> R + Send + 'static,
249        R: Future<Output = ()> + Send,
250    {
251        let event_rx = self.event_rx.clone();
252        tokio::spawn(async move {
253            while let Ok(event) = event_rx.recv().await {
254                f(event).await;
255            }
256        });
257    }
258
259    #[cfg(feature = "uniffi")]
260    /// Adds a new handler that implements the [EventHandler] trait.
261    ///
262    /// This exists for the foreign language bindings, with which generics don't
263    /// work. Prefer [`add_event_handler_closure`][Client::add_event_handler_closure] if using this SDK with Rust.
264    pub fn add_event_handler(&self, handler: Arc<dyn EventHandler>) {
265        let event_rx = self.event_rx.clone();
266        tokio::spawn(async move {
267            while let Ok(event) = event_rx.recv().await {
268                handler.handle(event).await;
269            }
270        });
271    }
272
273    /// Does the MSNP authentication process. Also starts regular pings and the handler for Switchboard invitations.
274    ///
275    /// # Events
276    /// If the server you're connecting to implements a Dispatch Server, then this will return a [RedirectedTo][Event::RedirectedTo] event.
277    /// What follows is [creating a new][Client::new] client instance with the server and port returned then logging in again, which
278    /// will return an [Authenticated][Event::Authenticated] event.
279    pub async fn login(
280        &self,
281        email: String,
282        password: &str,
283        nexus_url: &str,
284        client_name: &str,
285        version: &str,
286    ) -> Result<Event, SdkError> {
287        let mut internal_rx = self.internal_tx.subscribe();
288
289        ver::send(&self.tr_id, &self.ns_tx, &mut internal_rx).await?;
290        cvr::send(
291            &self.tr_id,
292            &self.ns_tx,
293            &mut internal_rx,
294            &email,
295            client_name,
296            version,
297        )
298        .await?;
299
300        let authorization_string =
301            match usr_i::send(&self.tr_id, &self.ns_tx, &mut internal_rx, &email).await? {
302                InternalEvent::GotAuthorizationString(authorization_string) => authorization_string,
303                InternalEvent::RedirectedTo { server, port } => {
304                    return Ok(Event::RedirectedTo { server, port });
305                }
306
307                _ => return Err(SdkError::CouldNotGetAuthenticationString),
308            };
309
310        let token = self
311            .http_client
312            .get_passport_token(&email, password, nexus_url, &authorization_string)
313            .await?;
314
315        usr_s::send(&self.tr_id, &self.ns_tx, &mut internal_rx, &token).await?;
316
317        {
318            let mut user_data = self.user_data.write().await;
319            user_data.email = Some(email);
320        }
321
322        syn::send(&self.tr_id, &self.ns_tx, &mut internal_rx).await?;
323        gcf::send(&self.tr_id, &self.ns_tx, &mut internal_rx).await?;
324
325        self.handle_switchboard_invitations();
326        self.start_pinging();
327
328        Ok(Event::Authenticated)
329    }
330
331    #[cfg(feature = "config")]
332    /// Makes a request to get the config file (containing tabs and the MSN Today url) and returns it.
333    pub async fn get_config(&self, config_url: &str) -> Result<Config, SdkError> {
334        self.http_client
335            .get_config(config_url)
336            .await
337            .or(Err(SdkError::ConfigRequestError))
338    }
339
340    /// Sets the user's presence status.
341    pub async fn set_presence(&self, presence: MsnpStatus) -> Result<(), SdkError> {
342        let mut internal_rx = self.internal_tx.subscribe();
343        let presence = Presence::new_without_object(presence);
344        let user_data = self.user_data.read().await;
345
346        chg::send(
347            &self.tr_id,
348            &self.ns_tx,
349            &mut internal_rx,
350            &presence,
351            user_data.msn_object.as_deref(),
352        )
353        .await
354    }
355
356    /// Sets the user's personal message.
357    pub async fn set_personal_message(
358        &self,
359        personal_message: &PersonalMessage,
360    ) -> Result<(), SdkError> {
361        let mut internal_rx = self.internal_tx.subscribe();
362        uux::send(&self.tr_id, &self.ns_tx, &mut internal_rx, personal_message).await
363    }
364
365    /// Sets the user's display name.
366    pub async fn set_display_name(&self, display_name: &str) -> Result<(), SdkError> {
367        let mut internal_rx = self.internal_tx.subscribe();
368        prp::send(&self.tr_id, &self.ns_tx, &mut internal_rx, display_name).await
369    }
370
371    /// Sets a contact's display name.
372    pub async fn set_contact_display_name(
373        &self,
374        guid: &str,
375        display_name: &str,
376    ) -> Result<(), ContactError> {
377        let mut internal_rx = self.internal_tx.subscribe();
378        sbp::send(
379            &self.tr_id,
380            &self.ns_tx,
381            &mut internal_rx,
382            guid,
383            display_name,
384        )
385        .await
386    }
387
388    /// Adds a contact to a specified list, also setting its display name if applicable.
389    pub async fn add_contact(
390        &self,
391        email: &str,
392        display_name: &str,
393        list: MsnpList,
394    ) -> Result<Event, ContactError> {
395        let mut internal_rx = self.internal_tx.subscribe();
396        adc::send(
397            &self.tr_id,
398            &self.ns_tx,
399            &mut internal_rx,
400            email,
401            display_name,
402            list,
403        )
404        .await
405    }
406
407    /// Removes a contact from a specified list (except the forward list, which requires calling
408    /// [remove_contact_from_forward_list][Client::remove_contact_from_forward_list]).
409    pub async fn remove_contact(&self, email: &str, list: MsnpList) -> Result<(), ContactError> {
410        let mut internal_rx = self.internal_tx.subscribe();
411        rem::send(&self.tr_id, &self.ns_tx, &mut internal_rx, email, list).await
412    }
413
414    /// Removes a contact from the forward list.
415    pub async fn remove_contact_from_forward_list(&self, guid: &str) -> Result<(), ContactError> {
416        let mut internal_rx = self.internal_tx.subscribe();
417        rem::send_with_forward_list(&self.tr_id, &self.ns_tx, &mut internal_rx, guid).await
418    }
419
420    /// Blocks a contact.
421    pub async fn block_contact(&self, email: &str) -> Result<(), ContactError> {
422        let mut internal_rx = self.internal_tx.subscribe();
423        adc::send(
424            &self.tr_id,
425            &self.ns_tx,
426            &mut internal_rx,
427            email,
428            email,
429            MsnpList::BlockList,
430        )
431        .await?;
432
433        rem::send(
434            &self.tr_id,
435            &self.ns_tx,
436            &mut internal_rx,
437            email,
438            MsnpList::AllowList,
439        )
440        .await
441    }
442
443    /// Unblocks a contact.
444    pub async fn unblock_contact(&self, email: &str) -> Result<(), ContactError> {
445        let mut internal_rx = self.internal_tx.subscribe();
446        adc::send(
447            &self.tr_id,
448            &self.ns_tx,
449            &mut internal_rx,
450            email,
451            email,
452            MsnpList::AllowList,
453        )
454        .await?;
455
456        rem::send(
457            &self.tr_id,
458            &self.ns_tx,
459            &mut internal_rx,
460            email,
461            MsnpList::BlockList,
462        )
463        .await
464    }
465
466    /// Creates a new contact group.
467    pub async fn create_group(&self, name: &str) -> Result<(), ContactError> {
468        let mut internal_rx = self.internal_tx.subscribe();
469        adg::send(&self.tr_id, &self.ns_tx, &mut internal_rx, name).await
470    }
471
472    /// Deletes a contact group.
473    pub async fn delete_group(&self, guid: &str) -> Result<(), ContactError> {
474        let mut internal_rx = self.internal_tx.subscribe();
475        rmg::send(&self.tr_id, &self.ns_tx, &mut internal_rx, guid).await
476    }
477
478    /// Renames a contact group.
479    pub async fn rename_group(&self, guid: &str, new_name: &str) -> Result<(), ContactError> {
480        let mut internal_rx = self.internal_tx.subscribe();
481        reg::send(&self.tr_id, &self.ns_tx, &mut internal_rx, guid, new_name).await
482    }
483
484    /// Adds a contact to a group.
485    pub async fn add_contact_to_group(
486        &self,
487        guid: &str,
488        group_guid: &str,
489    ) -> Result<(), ContactError> {
490        let mut internal_rx = self.internal_tx.subscribe();
491        adc::send_with_group(&self.tr_id, &self.ns_tx, &mut internal_rx, guid, group_guid).await
492    }
493
494    /// Removes a contact from a group.
495    pub async fn remove_contact_from_group(
496        &self,
497        guid: &str,
498        group_guid: &str,
499    ) -> Result<(), ContactError> {
500        let mut internal_rx = self.internal_tx.subscribe();
501        rem::send_with_group(&self.tr_id, &self.ns_tx, &mut internal_rx, guid, group_guid).await
502    }
503
504    /// Sets the GTC value, which can be either `A` or `N`.
505    pub async fn set_gtc(&self, gtc: &str) -> Result<(), SdkError> {
506        let mut internal_rx = self.internal_tx.subscribe();
507        gtc::send(&self.tr_id, &self.ns_tx, &mut internal_rx, gtc).await
508    }
509
510    /// Sets the BLP value, which can be either `AL` or `BL`.
511    pub async fn set_blp(&self, blp: &str) -> Result<(), SdkError> {
512        let mut internal_rx = self.internal_tx.subscribe();
513        blp::send(&self.tr_id, &self.ns_tx, &mut internal_rx, blp).await
514    }
515
516    /// Creates a new Switchboard session and invites the specified contact to it.
517    pub async fn create_session(&self, email: &str) -> Result<Switchboard, SdkError> {
518        let mut internal_rx = self.internal_tx.subscribe();
519        let switchboard = xfr::send(
520            &self.tr_id,
521            &self.ns_tx,
522            &mut internal_rx,
523            self.user_data.clone(),
524        )
525        .await?;
526
527        let user_data = self.user_data.read().await;
528        let user_email = user_data.email.as_ref().ok_or(SdkError::NotLoggedIn)?;
529
530        switchboard.login(user_email).await?;
531        switchboard.invite(email).await?;
532        Ok(switchboard)
533    }
534
535    /// Sets the user's display picture, returning a standard base64 encoded hash of it.
536    /// This method uses the picture's binary data, and scaling down to a size like 200x200 beforehand is recommended.
537    pub async fn set_display_picture(&self, display_picture: Vec<u8>) -> Result<String, SdkError> {
538        let mut user_data = self.user_data.write().await;
539        let user_email = user_data.email.as_ref().ok_or(SdkError::NotLoggedIn)?;
540
541        let mut hash = sha1_smol::Sha1::new();
542        hash.update(display_picture.as_slice());
543        let sha1d = STANDARD.encode(hash.digest().bytes());
544
545        let sha1c = format!(
546            "Creator{user_email}Size{}Type3LocationPIC.tmpFriendlyAAA=SHA1D{sha1d}",
547            display_picture.len()
548        );
549
550        let mut hash = sha1_smol::Sha1::new();
551        hash.update(sha1c.as_bytes());
552        let sha1c = STANDARD.encode(hash.digest().bytes());
553
554        user_data.msn_object = Some(format!(
555            "<msnobj Creator=\"{user_email}\" Size=\"{}\" Type=\"3\" Location=\"PIC.tmp\" Friendly=\"AAA=\" SHA1D=\"{sha1d}\" SHA1C=\"{sha1c}\"/>",
556            display_picture.len()
557        ));
558
559        user_data.display_picture = Some(display_picture);
560        Ok(sha1d)
561    }
562
563    /// Disconnects from the server.
564    pub async fn disconnect(&self) -> Result<(), SdkError> {
565        let command = "OUT\r\n";
566        trace!("C: {command}");
567
568        self.ns_tx
569            .send(command.as_bytes().to_vec())
570            .await
571            .or(Err(SdkError::TransmittingError))?;
572
573        self.event_tx.close();
574        self.cancellation_token.cancel();
575        Ok(())
576    }
577}