use std::{sync::{Arc, Mutex}, convert::TryInto, thread::JoinHandle};
use wamp_core::{messages::*, serde_json::from_str, tungstenite::client, subscribe, unsubscribe};
use std::thread::spawn;
use wamp_core::{Error, http::Response, tungstenite::{connect, Message}, WampMessage};
use crate::{core::Socket, sync::WampRequest};
use super::events::Events;
#[derive(Clone)]
pub struct Client {
pub socket: Socket,
pub request_id: Arc<Mutex<u64>>,
pub routing_id: Arc<Mutex<u64>>,
pub events: Arc<Mutex<Vec<Arc<Mutex<(u64, Events)>>>>>,
}
impl Client {
pub fn connect<U: ToString, P: ToString>(
request: WampRequest<U, P>,
) -> Result<(Client, Response<Option<Vec<u8>>>), Error> {
let (socket, response) = connect(request)?;
let socket = Arc::new(Mutex::new(socket));
let request_id = Arc::new(Mutex::new(0));
let routing_id = Arc::new(Mutex::new(0));
let events = Arc::new(Mutex::new(vec![]));
Ok((
Client {
socket,
request_id,
routing_id,
events
},
response,
))
}
pub fn on(&self, routing_id: u64, event: Events) {
let mut events = self.events.lock().expect("Events mutex guard poisoned");
events.push(Arc::new(Mutex::new((routing_id, event))));
}
pub fn send<T: WampMessage + TryInto<Message>>(&self, message: T) -> Result<(), Error>
where
Error: From<<T as TryInto<Message>>::Error>,
{
let socket = &mut *self
.socket
.lock()
.expect("WebSocket mutex Poisoned during message sending.");
Ok(socket.send(message.try_into()?)?)
}
pub fn new_routing_id(&self) -> u64 {
let mut request_id = *self.request_id.lock().unwrap();
request_id = request_id + 1;
request_id
}
pub fn new_request_id(&self) -> u64 {
let mut request_id = *self.request_id.lock().unwrap();
request_id = request_id + 1;
request_id
}
pub fn remove_callbacks(&self, routing_ids: Vec<u64>) {
let events = &mut *self.events.lock().unwrap();
events.retain(|callback| {
let (routing, _) = *callback.lock().unwrap();
if routing_ids.contains(&routing) {
return false
}
true
})
}
pub fn event_loop(&mut self) -> Result<(), Error> {
loop {
self.read_then_run_event()?;
}
}
pub fn read_then_run_event(&mut self) -> Result<Option<(Messages, JoinHandle<()>)>, Error> {
match self.read()? {
Some(message) => Ok(Some(self.run_events(message)?)),
None => Ok(None),
}
}
pub fn run_events(&mut self, message: Messages) -> Result<(Messages, JoinHandle<()>), Error> {
let events = (&self.events).clone();
let arc_client = Client::from(self);
macro_rules! run_events {
($events:ident, $value:expr) => {{
let arc_client = arc_client.clone();
let events = events.clone();
Ok((
message,
spawn(move || {
let mut events = events.lock().unwrap();
for event in events.iter_mut() {
let (_, event) = &mut *event.lock().unwrap();
if let Events::$events(callback) = event {
callback(arc_client.clone(), $value.clone());
}
}
}),
))
}};
}
match message.clone() {
Messages::Abort(abort) => run_events!(Abort, abort),
Messages::Challenge(challenge) => run_events!(Challenge, challenge),
Messages::Error(error) => run_events!(Error, error),
Messages::Event(event) => run_events!(Event, event),
Messages::Goodbye(goodbye) => run_events!(Goodbye, goodbye),
Messages::Interrupt(interrupt) => run_events!(Interrupt, interrupt),
Messages::Invocation(invocation) => run_events!(Invocation, invocation),
Messages::Published(published) => run_events!(Published, published),
Messages::Registered(registered) => run_events!(Registered, registered),
Messages::Result(result) => run_events!(Result, result),
Messages::Subscribed(subscribed) => run_events!(Subscribed, subscribed),
Messages::Unregistered(unregistered) => run_events!(Unregistered, unregistered),
Messages::Unsubscribed(unsubscribed) => run_events!(Unsubscribed, unsubscribed),
Messages::Welcome(welcome) => run_events!(Welcome, welcome),
Messages::Extension(extension) => run_events!(Extension, extension),
_ => Err(Error::InvalidFrameReceived(message)),
}
}
pub fn read(&mut self) -> Result<Option<Messages>, Error> {
match self.socket.lock().unwrap().read().unwrap() {
Message::Text(message) => Ok(Some(from_str(&message)?)),
_ => Ok(None),
}
}
}
impl From<&Client> for Client {
fn from(value: &Client) -> Self {
Self {
socket: value.socket.clone(),
request_id: value.request_id.clone(),
events: value.events.clone(),
routing_id: value.routing_id.clone()
}
}
}
impl From<&mut Client> for Client {
fn from(value: &mut Client) -> Self {
Self {
socket: value.socket.clone(),
request_id: value.request_id.clone(),
events: value.events.clone(),
routing_id: value.routing_id.clone()
}
}
}