mod error;
mod impls;
pub mod types;
mod utils;
use impls::Returns;
use std::net::{SocketAddr, TcpStream};
use std::sync::mpsc::{Receiver, Sender};
use std::thread::JoinHandle;
use std::time::Duration;
use std::{ops::ControlFlow, thread};
use tungstenite::stream::MaybeTlsStream;
use tungstenite::util::NonBlockingResult;
use tungstenite::{Message, WebSocket, client::IntoClientRequest};
use crate::utils::process_info::get_running_client;
use crate::utils::process_info::{CLIENT_PROCESS_NAME, GAME_PROCESS_NAME};
use crate::ws::types::{Event, EventKind, RequestType};
use crate::ws::utils::EventMap;
pub use error::Error as WebSocketError;
pub type WebSocketStream = WebSocket<MaybeTlsStream<TcpStream>>;
pub struct LcuWebSocket {
ws_sender: Sender<ChannelMessage>,
handle: JoinHandle<()>,
id_free_list: EventMap<(usize, Vec<usize>)>,
}
#[derive(Clone, Copy)]
#[repr(transparent)]
pub struct SubscriberID(usize);
impl SubscriberID {
#[doc(hidden)]
#[must_use]
pub fn inner(&self) -> usize {
self.0
}
#[doc(hidden)]
#[must_use]
pub fn from_inner(inner: usize) -> Self {
Self(inner)
}
}
enum ChannelMessage {
Subscribe(RequestType, EventKind, Box<dyn Subscriber + Send>),
Unsubscribe(SubscriberID, EventKind),
Abort,
}
#[derive(PartialEq, Eq)]
pub enum Flow {
TryReconnect,
Continue,
}
pub enum PoisonBehavior {
Panic,
Break,
Ignore,
Clear,
}
pub trait Subscriber {
fn on_poison(&self) -> PoisonBehavior {
PoisonBehavior::Panic
}
fn on_subscribe(&mut self, _event_kind: &EventKind, _request_code: &RequestType) {}
fn on_event(&mut self, event: &Event, _continues: &mut bool);
fn on_unsubscribe(&mut self, _event_kind: &EventKind) {}
}
pub trait ErrorHandler: Send {
fn on_error(&mut self, error: WebSocketError) -> ControlFlow<(), Flow>;
fn on_connect(&mut self, socket: &mut WebSocketStream) -> Result<(), WebSocketError> {
match socket.get_ref() {
MaybeTlsStream::Plain(_) => unimplemented!("The stream is always encrypted"),
#[cfg(feature = "ws_rustls")]
MaybeTlsStream::Rustls(stream_owned) => {
stream_owned.sock.set_nonblocking(true)?;
}
#[cfg(feature = "ws_nativetls")]
MaybeTlsStream::NativeTls(tls_stream) => {
tls_stream.get_ref().set_nonblocking(true)?;
}
_ => {
unimplemented!("There are no other cases")
}
}
Ok(())
}
fn on_timeout(&mut self) {
thread::sleep(Duration::from_millis(500));
}
}
pub struct DefaultErrorHandler;
impl ErrorHandler for DefaultErrorHandler {
fn on_error(&mut self, error: WebSocketError) -> ControlFlow<(), Flow> {
eprintln!("{error}");
ControlFlow::Break(())
}
}
impl Default for LcuWebSocket {
fn default() -> Self {
Self::new()
}
}
impl LcuWebSocket {
#[must_use]
pub fn new() -> Self {
Self::new_with_error_handler(DefaultErrorHandler)
}
#[must_use]
pub fn new_with_error_handler(error_handler: impl ErrorHandler + 'static) -> Self {
let (ws_sender, ws_receiver) = std::sync::mpsc::channel::<ChannelMessage>();
let handle = thread::spawn(move || {
let tls = crate::tls::connector();
let mut error_handler = error_handler;
let ws_receiver = ws_receiver;
event_loop(&mut error_handler, &ws_receiver, &tls);
});
Self {
ws_sender,
handle,
id_free_list: EventMap::new(),
}
}
pub fn subscribe(
&mut self,
event_kind: EventKind,
subscriber: impl Subscriber + Send + 'static,
) -> Option<SubscriberID> {
let (next_id, returned) = self.id_free_list.get_mut(&event_kind);
self.ws_sender
.send(ChannelMessage::Subscribe(
RequestType::Subscribe,
event_kind,
Box::new(subscriber),
))
.ok()?;
let id = if returned.is_empty() {
let tmp = *next_id;
*next_id += 1;
tmp
} else {
returned.remove(0)
};
Some(SubscriberID(id))
}
pub fn subscribe_closure<R: Returns>(
&mut self,
event_kind: EventKind,
subscribe_closure: impl Fn(&Event) -> R + Send + 'static,
) -> Option<SubscriberID> {
let (next_id, returned) = self.id_free_list.get_mut(&event_kind);
self.ws_sender
.send(ChannelMessage::Subscribe(
RequestType::Subscribe,
event_kind,
Box::new(subscribe_closure),
))
.ok()?;
let id = if returned.is_empty() {
let tmp = *next_id;
*next_id += 1;
tmp
} else {
returned.remove(0)
};
Some(SubscriberID(id))
}
pub fn unsubscribe(&mut self, event_kind: EventKind, id: SubscriberID) -> Option<()> {
let (_, returned) = self.id_free_list.get_mut(&event_kind);
self.ws_sender
.send(ChannelMessage::Unsubscribe(id, event_kind))
.ok()?;
returned.push(id.0);
Some(())
}
#[must_use]
pub fn abort(self) -> Option<()> {
self.ws_sender.send(ChannelMessage::Abort).ok()
}
#[must_use]
pub fn is_finished(&self) -> bool {
self.handle.is_finished()
}
}
pub fn force<R: Returns, F: FnMut(&Event) -> R + Send>(f: F) -> F {
f
}
type SubscriberMap = EventMap<Vec<Option<Box<dyn Subscriber>>>>;
fn event_loop(
error_handler: &mut impl ErrorHandler,
receiver: &Receiver<ChannelMessage>,
tls: &crate::tls::TlsType,
) {
let mut maybe_stream: Option<WebSocketStream> = None;
let mut subscribers = SubscriberMap::new();
let mut control_flow = ControlFlow::Continue(Flow::Continue);
let mut abort = false;
while control_flow.is_continue() {
if let Some(stream) = &mut maybe_stream {
if let Ok(message) = receiver.try_recv() {
let mut ws_message = None;
match message {
ChannelMessage::Subscribe(code, event_kind, mut subscriber) => {
let subscribers = subscribers.get_mut(&event_kind);
if subscribers.is_empty() {
let endpoint_str = event_kind.to_string();
let command = format!("[{}, \"{endpoint_str}\"]", code as u8).into();
ws_message = Some(Message::Text(command));
}
subscriber.on_subscribe(&event_kind, &code);
if let Some(idx) = subscribers.iter().position(Option::is_none) {
subscribers[idx] = Some(subscriber);
} else {
subscribers.push(Some(subscriber));
}
}
ChannelMessage::Unsubscribe(subscriber_id, event_kind) => {
let subscribers = subscribers.get_mut(&event_kind);
let subscriber = &mut subscribers[subscriber_id.0];
if let Some(subscriber) = subscriber {
subscriber.on_unsubscribe(&event_kind);
}
*subscriber = None;
if subscribers.iter().flatten().count() == 0 {
let unsub = format!(
"[{}, \"{}\"]",
RequestType::Unsubscribe as u8,
event_kind.to_string()
)
.into();
ws_message = Some(Message::Text(unsub));
}
}
ChannelMessage::Abort => {
abort = true;
ws_message = Some(Message::Close(None));
}
}
if let Some(Err(e)) = ws_message.map(|m| stream.send(m)) {
control_flow = error_handler.on_error(e.into());
}
if abort {
break;
}
}
if control_flow == ControlFlow::Continue(Flow::Continue) {
control_flow = receive_message(stream, &mut subscribers, error_handler)
.unwrap_or_else(|e| error_handler.on_error(e));
}
} else {
connect(tls, error_handler).map_or_else(
|e| control_flow = error_handler.on_error(e),
|stream| maybe_stream = Some(stream),
);
}
if control_flow == ControlFlow::Continue(Flow::TryReconnect) {
maybe_stream = None;
}
}
if control_flow.is_break() {
let maybe_stream = maybe_stream.filter(WebSocket::can_write);
if let Some(Err(e)) = maybe_stream.map(|mut stream| stream.send(Message::Close(None))) {
let _ = error_handler.on_error(e.into());
}
}
}
fn receive_message(
stream: &mut WebSocketStream,
subscribers: &mut SubscriberMap,
error_handler: &mut impl ErrorHandler,
) -> Result<ControlFlow<(), Flow>, WebSocketError> {
let read = stream
.read()
.no_block()?
.filter(|msg| !msg.is_empty())
.map(Message::into_data);
if let Some(data) = read {
let json = serde_json::from_slice::<Event>(&data)?;
let subscribers = subscribers.get_mut(&json.1);
for subscriber in subscribers.iter_mut().flatten() {
let mut continues = true;
subscriber.on_event(&json, &mut continues);
if !continues {
return Ok(ControlFlow::Break(()));
}
}
} else {
error_handler.on_timeout();
}
Ok(ControlFlow::Continue(Flow::Continue))
}
fn connect(
tls: &crate::tls::TlsType,
error_handler: &mut impl ErrorHandler,
) -> Result<WebSocketStream, WebSocketError> {
const TIMEOUT: Duration = Duration::from_millis(100);
let (addr, auth) = get_running_client(CLIENT_PROCESS_NAME, GAME_PROCESS_NAME, false, None)?;
let str_req = format!("wss://{addr}");
let mut request = str_req.into_client_request()?;
request.headers_mut().insert("Authorization", auth?);
let tcp_stream = TcpStream::connect_timeout(&SocketAddr::V4(addr), TIMEOUT)?;
let (mut stream, _) = tungstenite::client_tls_with_config(
request.clone(),
tcp_stream,
None,
Some(crate::tls::wrap_connector(tls)),
)
.expect("The TLS handshake should never fail");
error_handler.on_connect(&mut stream)?;
Ok(stream)
}