use crate::MsnObject;
use crate::enums::event::Event;
use crate::enums::internal_event::InternalEvent;
use crate::enums::msnp_list::MsnpList;
use crate::enums::msnp_status::MsnpStatus;
use crate::errors::contact_error::ContactError;
use crate::errors::sdk_error::SdkError;
#[cfg(feature = "uniffi")]
use crate::event_handler::EventHandler;
#[cfg(feature = "config")]
use crate::http::config::Config;
use crate::http::http_client::HttpClient;
use crate::models::personal_message::PersonalMessage;
use crate::models::presence::Presence;
use crate::models::user_data::UserData;
use crate::notification_server::commands::{
adc, adg, blp, chg, cvr, gcf, gtc, prp, reg, rem, rmg, sbp, syn, usr_i, usr_s, uux, ver, xfr,
};
use crate::notification_server::event_matcher::{into_event, into_internal_event};
use crate::receive_split::receive_split;
use crate::switchboard_server::switchboard::Switchboard;
use base64::{Engine as _, engine::general_purpose::STANDARD};
use core::str;
use log::{error, trace};
use std::sync::Arc;
use std::sync::atomic::AtomicU32;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpStream, lookup_host};
use tokio::sync::{RwLock, broadcast, mpsc};
use tokio_util::sync::CancellationToken;
pub struct Client {
event_tx: async_channel::Sender<Event>,
event_rx: async_channel::Receiver<Event>,
ns_tx: mpsc::Sender<Vec<u8>>,
internal_tx: broadcast::Sender<InternalEvent>,
tr_id: AtomicU32,
user_data: Arc<RwLock<UserData>>,
http_client: HttpClient,
cancellation_token: CancellationToken,
}
impl Client {
pub async fn new(server: &str, port: u16) -> Result<Self, SdkError> {
let mut server_ips = lookup_host((server, port))
.await
.or(Err(SdkError::ResolutionError))?;
let server_ip = server_ips
.find(|ip| ip.is_ipv4())
.ok_or(SdkError::ResolutionError)?
.ip();
let (event_tx, event_rx) = async_channel::unbounded();
let (ns_tx, mut ns_rx) = mpsc::channel::<Vec<u8>>(256);
let (internal_tx, _) = broadcast::channel::<InternalEvent>(256);
let socket = TcpStream::connect((server_ip, port))
.await
.or(Err(SdkError::ServerError))?;
let (mut rd, mut wr) = socket.into_split();
let task_internal_tx = internal_tx.clone();
let task_event_tx = event_tx.clone();
let cancellation_token = CancellationToken::new();
let task_cancellation_token = cancellation_token.clone();
tokio::spawn(async move {
'outer: while let Ok(messages) =
receive_split(&mut rd, task_cancellation_token.clone()).await
{
for message in messages {
let internal_event = into_internal_event(&message);
if let Err(error) = task_internal_tx.send(internal_event) {
error!("{error}");
}
let event = into_event(&message);
if let Some(event) = event {
let disconnected =
matches!(event, Event::Disconnected | Event::LoggedInAnotherDevice);
if let Err(error) = task_event_tx.send(event).await {
error!("{error}");
break 'outer;
}
if disconnected {
task_event_tx.close();
break 'outer;
}
}
}
}
if let Err(error) = task_event_tx.send(Event::Disconnected).await {
error!("{error}");
}
task_event_tx.close();
task_cancellation_token.cancel();
});
let task_event_tx = event_tx.clone();
let task_cancellation_token = cancellation_token.clone();
tokio::spawn(async move {
loop {
tokio::select! {
message = ns_rx.recv() => {
if let Some(message) = message {
if let Err(error) = wr.write_all(&message).await {
error!("{error}")
}
} else {
break;
}
}
_ = task_cancellation_token.cancelled() => {
break;
}
}
}
if let Err(error) = task_event_tx.send(Event::Disconnected).await {
error!("{error}");
}
task_event_tx.close();
task_cancellation_token.cancel();
});
Ok(Self {
event_tx,
event_rx,
ns_tx,
internal_tx,
tr_id: AtomicU32::new(0),
user_data: Arc::new(RwLock::new(UserData::new())),
http_client: HttpClient::new(),
cancellation_token,
})
}
fn start_pinging(&self) {
let event_tx = self.event_tx.clone();
let ns_tx = self.ns_tx.clone();
let mut internal_rx = self.internal_tx.subscribe();
let task_cancellation_token = self.cancellation_token.clone();
tokio::spawn(async move {
let command = "PNG\r\n";
'outer: while ns_tx.send(command.as_bytes().to_vec()).await.is_ok() {
trace!("C: {command}");
loop {
tokio::select! {
reply = internal_rx.recv() => {
if let Ok(InternalEvent::ServerReply(reply)) = reply {
trace!("S: {reply}");
let mut args = reply.split_ascii_whitespace();
if args.next().unwrap_or("") == "QNG" {
if let Ok(duration) = args.next().unwrap_or("").parse()
&& duration > 5
{
tokio::time::sleep(Duration::from_secs(duration)).await;
break;
} else {
break 'outer;
}
}
}
}
_ = task_cancellation_token.cancelled() => {
break 'outer;
}
}
}
}
if let Err(error) = event_tx.send(Event::Disconnected).await {
error!("{error}");
}
event_tx.close();
task_cancellation_token.cancel();
});
}
fn handle_switchboard_invitations(&self) {
let event_tx = self.event_tx.clone();
let mut internal_rx = self.internal_tx.subscribe();
let user_data = self.user_data.clone();
let task_cancellation_token = self.cancellation_token.clone();
tokio::spawn(async move {
loop {
tokio::select! {
event = internal_rx.recv() => {
if let Ok(event) = event && let InternalEvent::SwitchboardInvitation {
server,
port,
session_id,
cki_string,
} = event
{
let switchboard = Switchboard::new(
server.as_str(),
port.as_str(),
cki_string.as_str(),
user_data.clone(),
)
.await;
if let Ok(switchboard) = switchboard {
let user_data = user_data.read().await;
if let Some(ref user_email) = user_data.email
&& switchboard.answer(user_email, &session_id).await.is_ok()
&& let Err(error) = event_tx
.send(Event::SessionAnswered(Arc::new(switchboard)))
.await
{
error!("{error}");
break;
}
}
}
}
_ = task_cancellation_token.cancelled() => {
break;
}
}
}
});
}
pub fn add_event_handler_closure<F, R>(&self, f: F)
where
F: Fn(Event) -> R + Send + 'static,
R: Future<Output = ()> + Send,
{
let event_rx = self.event_rx.clone();
tokio::spawn(async move {
while let Ok(event) = event_rx.recv().await {
f(event).await;
}
});
}
#[cfg(feature = "uniffi")]
pub fn add_event_handler(&self, handler: Arc<dyn EventHandler>) {
let event_rx = self.event_rx.clone();
tokio::spawn(async move {
while let Ok(event) = event_rx.recv().await {
handler.handle(event).await;
}
});
}
pub async fn login(
&self,
email: String,
password: &str,
nexus_url: &str,
client_name: &str,
version: &str,
) -> Result<Event, SdkError> {
let mut internal_rx = self.internal_tx.subscribe();
ver::send(&self.tr_id, &self.ns_tx, &mut internal_rx).await?;
cvr::send(
&self.tr_id,
&self.ns_tx,
&mut internal_rx,
&email,
client_name,
version,
)
.await?;
let authorization_string =
match usr_i::send(&self.tr_id, &self.ns_tx, &mut internal_rx, &email).await? {
InternalEvent::GotAuthorizationString(authorization_string) => authorization_string,
InternalEvent::RedirectedTo { server, port } => {
return Ok(Event::RedirectedTo { server, port });
}
_ => return Err(SdkError::CouldNotGetAuthenticationString),
};
let token = self
.http_client
.get_passport_token(&email, password, nexus_url, &authorization_string)
.await?;
usr_s::send(&self.tr_id, &self.ns_tx, &mut internal_rx, &token).await?;
{
let mut user_data = self.user_data.write().await;
user_data.email = Some(email);
}
syn::send(&self.tr_id, &self.ns_tx, &mut internal_rx).await?;
gcf::send(&self.tr_id, &self.ns_tx, &mut internal_rx).await?;
self.handle_switchboard_invitations();
self.start_pinging();
Ok(Event::Authenticated)
}
#[cfg(feature = "config")]
pub async fn get_config(&self, config_url: &str) -> Result<Config, SdkError> {
self.http_client
.get_config(config_url)
.await
.or(Err(SdkError::ConfigRequestError))
}
pub async fn set_presence(&self, presence: MsnpStatus) -> Result<(), SdkError> {
let mut internal_rx = self.internal_tx.subscribe();
let presence = Presence::new_without_object(presence);
let user_data = self.user_data.read().await;
chg::send(
&self.tr_id,
&self.ns_tx,
&mut internal_rx,
&presence,
user_data.msn_object.as_deref(),
)
.await
}
pub async fn set_personal_message(
&self,
personal_message: &PersonalMessage,
) -> Result<(), SdkError> {
let mut internal_rx = self.internal_tx.subscribe();
uux::send(&self.tr_id, &self.ns_tx, &mut internal_rx, personal_message).await
}
pub async fn set_display_name(&self, display_name: &str) -> Result<(), SdkError> {
let mut internal_rx = self.internal_tx.subscribe();
prp::send(&self.tr_id, &self.ns_tx, &mut internal_rx, display_name).await
}
pub async fn set_contact_display_name(
&self,
guid: &str,
display_name: &str,
) -> Result<(), ContactError> {
let mut internal_rx = self.internal_tx.subscribe();
sbp::send(
&self.tr_id,
&self.ns_tx,
&mut internal_rx,
guid,
display_name,
)
.await
}
pub async fn add_contact(
&self,
email: &str,
display_name: &str,
list: MsnpList,
) -> Result<Event, ContactError> {
let mut internal_rx = self.internal_tx.subscribe();
adc::send(
&self.tr_id,
&self.ns_tx,
&mut internal_rx,
email,
display_name,
list,
)
.await
}
pub async fn remove_contact(&self, email: &str, list: MsnpList) -> Result<(), ContactError> {
let mut internal_rx = self.internal_tx.subscribe();
rem::send(&self.tr_id, &self.ns_tx, &mut internal_rx, email, list).await
}
pub async fn remove_contact_from_forward_list(&self, guid: &str) -> Result<(), ContactError> {
let mut internal_rx = self.internal_tx.subscribe();
rem::send_with_forward_list(&self.tr_id, &self.ns_tx, &mut internal_rx, guid).await
}
pub async fn block_contact(&self, email: &str) -> Result<(), ContactError> {
let mut internal_rx = self.internal_tx.subscribe();
adc::send(
&self.tr_id,
&self.ns_tx,
&mut internal_rx,
email,
email,
MsnpList::BlockList,
)
.await?;
rem::send(
&self.tr_id,
&self.ns_tx,
&mut internal_rx,
email,
MsnpList::AllowList,
)
.await
}
pub async fn unblock_contact(&self, email: &str) -> Result<(), ContactError> {
let mut internal_rx = self.internal_tx.subscribe();
adc::send(
&self.tr_id,
&self.ns_tx,
&mut internal_rx,
email,
email,
MsnpList::AllowList,
)
.await?;
rem::send(
&self.tr_id,
&self.ns_tx,
&mut internal_rx,
email,
MsnpList::BlockList,
)
.await
}
pub async fn create_group(&self, name: &str) -> Result<(), ContactError> {
let mut internal_rx = self.internal_tx.subscribe();
adg::send(&self.tr_id, &self.ns_tx, &mut internal_rx, name).await
}
pub async fn delete_group(&self, guid: &str) -> Result<(), ContactError> {
let mut internal_rx = self.internal_tx.subscribe();
rmg::send(&self.tr_id, &self.ns_tx, &mut internal_rx, guid).await
}
pub async fn rename_group(&self, guid: &str, new_name: &str) -> Result<(), ContactError> {
let mut internal_rx = self.internal_tx.subscribe();
reg::send(&self.tr_id, &self.ns_tx, &mut internal_rx, guid, new_name).await
}
pub async fn add_contact_to_group(
&self,
guid: &str,
group_guid: &str,
) -> Result<(), ContactError> {
let mut internal_rx = self.internal_tx.subscribe();
adc::send_with_group(&self.tr_id, &self.ns_tx, &mut internal_rx, guid, group_guid).await
}
pub async fn remove_contact_from_group(
&self,
guid: &str,
group_guid: &str,
) -> Result<(), ContactError> {
let mut internal_rx = self.internal_tx.subscribe();
rem::send_with_group(&self.tr_id, &self.ns_tx, &mut internal_rx, guid, group_guid).await
}
pub async fn set_gtc(&self, gtc: &str) -> Result<(), SdkError> {
let mut internal_rx = self.internal_tx.subscribe();
gtc::send(&self.tr_id, &self.ns_tx, &mut internal_rx, gtc).await
}
pub async fn set_blp(&self, blp: &str) -> Result<(), SdkError> {
let mut internal_rx = self.internal_tx.subscribe();
blp::send(&self.tr_id, &self.ns_tx, &mut internal_rx, blp).await
}
pub async fn create_session(&self, email: &str) -> Result<Switchboard, SdkError> {
let mut internal_rx = self.internal_tx.subscribe();
let switchboard = xfr::send(
&self.tr_id,
&self.ns_tx,
&mut internal_rx,
self.user_data.clone(),
)
.await?;
let user_data = self.user_data.read().await;
let user_email = user_data.email.as_ref().ok_or(SdkError::NotLoggedIn)?;
switchboard.login(user_email).await?;
switchboard.invite(email).await?;
Ok(switchboard)
}
pub async fn set_display_picture(&self, display_picture: Vec<u8>) -> Result<String, SdkError> {
let mut user_data = self.user_data.write().await;
let user_email = user_data.email.as_ref().ok_or(SdkError::NotLoggedIn)?;
let mut hash = sha1_smol::Sha1::new();
hash.update(display_picture.as_slice());
let sha1d = STANDARD.encode(hash.digest().bytes());
let sha1c = format!(
"Creator{user_email}Size{}Type3LocationPIC.tmpFriendlyAAA=SHA1D{sha1d}",
display_picture.len()
);
let mut hash = sha1_smol::Sha1::new();
hash.update(sha1c.as_bytes());
let sha1c = STANDARD.encode(hash.digest().bytes());
let msn_object = MsnObject {
creator: (*user_email).clone(),
size: display_picture.len() as u32,
object_type: 3,
location: "PIC.tmp".to_string(),
friendly: "AAA=".to_string(),
sha1d: sha1d.clone(),
sha1c: Some(sha1c),
content_type: None,
};
user_data.msn_object =
Some(quick_xml::se::to_string(&msn_object).or(Err(SdkError::CouldNotCreateMsnObject))?);
user_data.display_picture = Some(display_picture);
Ok(sha1d)
}
pub async fn disconnect(&self) -> Result<(), SdkError> {
let command = "OUT\r\n";
trace!("C: {command}");
self.ns_tx
.send(command.as_bytes().to_vec())
.await
.or(Err(SdkError::TransmittingError))?;
self.event_tx.close();
self.cancellation_token.cancel();
Ok(())
}
}