use crate::helper::url::Url;
use crate::location::Location;
use crate::{ApiError, helper};
use futures_util::stream::SplitStream;
use futures_util::{SinkExt, StreamExt, stream::SplitSink};
use serde::{Deserialize, Serialize};
use std::cmp::PartialEq;
use std::future::Future;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio_tungstenite::tungstenite::Utf8Bytes;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite};
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Event {
#[serde(rename = "msg")]
pub message: Message,
}
impl Event {
#[must_use]
pub const fn new(message: Message) -> Self {
Self { message }
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "msg")]
#[allow(missing_docs)]
#[non_exhaustive]
pub enum Message {
SubscriptionTopicsInfo(serde_json::Value),
DeviceInfoSet(serde_json::Value),
SessionInfo(serde_json::Value),
DataUpdate(serde_json::Value),
#[serde(other)]
Unknown,
}
impl TryFrom<Event> for tungstenite::protocol::Message {
type Error = serde_json::Error;
fn try_from(event: Event) -> Result<Self, Self::Error> {
Ok(Self::Text(Utf8Bytes::from(&serde_json::to_string(&event)?)))
}
}
#[derive(Debug)]
pub struct Connection {
stream: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
sink: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
}
impl Connection {
#[must_use]
pub(crate) fn new(stream: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Self {
let (sink, stream) = stream.split();
Self { stream, sink }
}
#[must_use]
pub async fn next(&mut self) -> Option<Result<Event, ApiError>> {
while let Some(message) = self.stream.next().await {
match message {
Ok(message) => {
if let tungstenite::protocol::Message::Ping(_) = message {
log::debug!("Recieved ping message from Ring");
continue;
}
let event = serde_json::from_str::<Event>(&message.to_string())
.map_err(ApiError::InvalidResponse);
if let Err(error) = event {
log::error!("Error deserializing message: {:?}", error);
return Some(Err(error));
}
log::debug!("Received event: {:?}", event);
return Some(event);
}
Err(error) => {
log::error!("Error receiving message: {:?}", error);
return Some(Err(ApiError::WebsocketError(error)));
}
}
}
None
}
pub async fn send(&mut self, event: Event) -> Result<(), ApiError> {
self.sink
.send(event.try_into()?)
.await
.map_err(ApiError::WebsocketError)
}
pub async fn close(self) {
let stream = self.stream.reunite(self.sink);
match stream {
Ok(mut stream) => {
let closed = stream.close(None).await;
if let Err(error) = closed {
log::error!("Error closing stream: {:?}", error);
return;
}
log::info!("Shut down Websocket connection gracefully");
}
Err(_) => {
log::info!("Unable to reunite write and read pair into stream");
}
}
}
}
#[derive(Debug)]
pub struct Listener<'a> {
location: &'a Location<'a>,
connection: Connection,
}
impl<'a> Listener<'a> {
#[must_use]
pub fn new<'b>(
location: &'b Location<'_>,
stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
) -> Listener<'b> {
Listener {
location,
connection: Connection::new(stream),
}
}
pub async fn listen<EventHandler, EventHandlerFut, E>(
&'a mut self,
on_event: EventHandler,
) -> Result<(), E>
where
EventHandler:
Fn(Event, &'a Location<'a>, Arc<Mutex<&'a mut Connection>>) -> EventHandlerFut,
EventHandlerFut: Future<Output = Result<bool, E>>,
{
let connection = Arc::new(Mutex::new(&mut self.connection));
loop {
let event = { connection.lock().await.next().await };
match event {
Some(Ok(event)) => {
if event.message == Message::Unknown {
log::warn!("Unknown message received: {:?}", event.message);
continue;
}
log::debug!("Received event: {:?}", event);
let outcome = on_event(event, self.location, Arc::clone(&connection)).await?;
if !outcome {
log::debug!("Event handler returned false, stopping listener");
break;
}
}
Some(Err(error)) => {
log::error!("Error receiving event: {:?}", error);
}
None => {
log::info!("Websocket stream closed, stopping listener");
break;
}
}
}
Ok(())
}
pub async fn send(&mut self, event: Event) -> Result<(), ApiError> {
self.connection.send(event).await
}
pub async fn close(self) {
self.connection.close().await;
}
}
impl<'a> Location<'a> {
pub async fn get_listener(&'a self) -> Result<Listener<'a>, ApiError> {
let (stream, _) = self.connect().await?;
Ok(Listener::new(self, stream))
}
async fn connect(
&self,
) -> Result<
(
WebSocketStream<MaybeTlsStream<TcpStream>>,
tungstenite::handshake::client::Response,
),
ApiError,
> {
let ticket = self.session.get_ticket(&self.data.id).await?;
let request = helper::url::get_base_url(&Url::Websocket {
host: &ticket.host,
auth_code: &ticket.id,
})
.into_client_request()?;
Ok(connect_async(request).await?)
}
}