use crate::{Pod, WebSocket};
use poll_channel::*;
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
use ws::{CloseCode, Error, ErrorKind, Handler, Handshake, Message, Result, Sender};
pub struct Client {
thread: Option<JoinHandle<()>>,
radio: Option<Sender>,
sender: Option<Sender>,
ws: Option<WebSocket>,
tx: EventSender,
rx: EventReceiver,
run: std::sync::mpsc::Sender<bool>,
exit: Option<std::sync::mpsc::Receiver<bool>>,
mover: std::sync::mpsc::Sender<Sender>,
connector: std::sync::mpsc::Receiver<Sender>,
address: String,
}
type EventReceiver = poll_channel::Receiver<Event>;
type EventSender = poll_channel::Sender<Event>;
pub enum Event {
Open(Sender),
Close,
Text(String),
Binary(Vec<u8>),
Timeout,
Error(Error),
}
impl Client {
pub fn new() -> Self {
let (tx, rx) = poll_channel::channel();
let (run, exit) = std::sync::mpsc::channel();
let (mover, connector) = std::sync::mpsc::channel();
Self {
thread: None,
radio: None,
sender: None,
ws: None,
tx,
rx,
run,
exit: Some(exit),
mover,
connector,
address: String::new(),
}
}
pub fn connect(&mut self, url: &str) -> Pod {
let to = url::Url::parse(url)?;
self.address = url.into();
let tx = self.tx.clone();
let mover = self.mover.clone();
let exit = self.exit.take().unwrap();
let thread = thread::spawn(move || {
loop {
let inner = tx.clone();
let mut socket = ws::Builder::new()
.build(move |sender| WebSocketHandler {
ws: sender,
tx: inner.clone(),
})
.unwrap();
let radio = socket.broadcaster();
let _ = mover.send(radio);
let _ = socket.connect(to.clone());
let _ = socket.run();
if let Ok(_) = exit.recv_timeout(Duration::from_secs(3)) {
break;
}
}
});
self.thread = Some(thread);
Ok(())
}
pub fn is_open(&self) -> bool {
self.sender.is_some()
}
pub fn recv(&mut self, timeout: f32) -> Event {
let n = Duration::from_nanos((timeout as f64 * 1e9) as u64);
if let Ok(radio) = self.connector.try_recv() {
self.radio = Some(radio);
}
match self.rx.recv_timeout(n) {
Ok(event) => {
if let Event::Open(sender) = &event {
self.sender = Some(sender.clone());
}
return event;
}
Err(error) => match error {
RecvTimeoutError::Timeout => return Event::Timeout,
RecvTimeoutError::Disconnected => {
self.radio = None;
self.sender = None;
return Event::Error(Error::new(ErrorKind::Internal, error.to_string()));
}
},
};
}
pub fn send<M>(&self, msg: M) -> Pod
where
M: Into<ws::Message>,
{
if let Some(sender) = &self.sender {
Ok(sender.send(msg)?)
} else {
Err(anyhow::anyhow!("websocket not connected"))
}
}
pub fn close(&self) {
if let Some(radio) = &self.radio {
radio.shutdown().ok();
}
}
pub fn process<F: crate::Handler>(&mut self, handler: &mut F, timeout: f32) -> Pod {
loop {
match self.recv(timeout) {
Event::Open(sender) => {
self.sender = Some(sender.clone());
let ws = WebSocket {
id: 0,
sender,
address: self.address.clone(),
};
self.ws = Some(ws);
handler.on_open(self.ws.as_mut().unwrap())?;
}
Event::Close => {
handler.on_close(self.ws.as_mut().unwrap())?;
}
Event::Text(msg) => {
handler.on_message(self.ws.as_mut().unwrap(), msg)?;
}
Event::Binary(buf) => {
handler.on_binary(self.ws.as_mut().unwrap(), buf)?;
}
Event::Timeout => {
handler.on_timeout()?;
break;
}
Event::Error(error) => {
handler.on_error(error)?;
break;
}
}
}
Ok(())
}
}
impl Drop for Client {
fn drop(&mut self) {
if let Some(radio) = &self.radio {
radio.shutdown().ok();
}
self.run.send(false).ok();
if let Some(thread) = self.thread.take() {
thread.join().ok();
}
}
}
pub fn connect(url: &str) -> anyhow::Result<Client> {
let mut client = Client::new();
client.connect(url)?;
Ok(client)
}
struct WebSocketHandler {
ws: Sender,
tx: EventSender,
}
impl Handler for WebSocketHandler {
fn on_open(&mut self, _: Handshake) -> Result<()> {
let _ = self.tx.send(Event::Open(self.ws.clone()));
Ok(())
}
fn on_message(&mut self, msg: Message) -> Result<()> {
match msg {
Message::Text(x) => {
let _ = self.tx.send(Event::Text(x));
}
Message::Binary(x) => {
let _ = self.tx.send(Event::Binary(x));
}
}
Ok(())
}
fn on_close(&mut self, _code: CloseCode, _reason: &str) {
let _ = self.tx.send(Event::Close);
}
fn on_error(&mut self, err: Error) {
let _ = self.tx.send(Event::Error(err));
}
}
impl Pollable for Client {
fn signal(&self) -> ArcMutex2<OptionSignal> {
self.rx.signal().clone()
}
fn id(&self) -> i32 {
self.rx.id()
}
}