use std::sync::Arc;
use std::time::Duration;
use futures_util::StreamExt;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::{broadcast, mpsc, watch, Notify};
use tokio::time::sleep;
use wreq::websocket::{Message, WebSocket};
use crate::auth::AuthState;
use crate::error::GrindrError;
use crate::headers::GrindrHeaders;
use crate::rest::InnerClient;
const WS_URL: &str = "wss://grindr.mobi/v1/ws";
const WS_BROADCAST_CAPACITY: usize = 256;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WsCommand {
pub r#type: String,
pub ref_id: String,
pub payload: Value,
}
#[derive(Debug, Clone)]
pub struct WsEvent {
pub event_type: String,
pub payload: Value,
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum WsConnectionState {
#[default]
Disconnected,
Connected,
}
pub(crate) struct WsChannels {
pub event_tx: broadcast::Sender<WsEvent>,
pub state_tx: watch::Sender<WsConnectionState>,
}
pub(crate) fn make_channels() -> (WsChannels, WsHandles) {
let (event_tx, _) = broadcast::channel(WS_BROADCAST_CAPACITY);
let (cmd_tx, cmd_rx) = mpsc::channel(64);
let (state_tx, state_rx) = watch::channel(WsConnectionState::Disconnected);
let channels = WsChannels {
event_tx: event_tx.clone(),
state_tx,
};
let handles = WsHandles {
cmd_tx,
cmd_rx,
state_rx,
};
(channels, handles)
}
pub(crate) struct WsHandles {
pub cmd_tx: mpsc::Sender<WsCommand>,
pub cmd_rx: mpsc::Receiver<WsCommand>,
pub state_rx: watch::Receiver<WsConnectionState>,
}
pub(crate) fn spawn_ws_task(
inner: Arc<InnerClient>,
auth: Arc<AuthState>,
channels: WsChannels,
mut cmd_rx: mpsc::Receiver<WsCommand>,
logout_notify: Arc<Notify>,
) {
tokio::spawn(async move {
let mut session_rx = auth.session_tx.subscribe();
let mut backoff = Duration::from_secs(1);
loop {
loop {
if auth.session.read().await.is_some() {
break;
}
if session_rx.changed().await.is_err() {
return;
}
}
match connect_and_run(&inner, &auth, &channels, &mut cmd_rx, &logout_notify).await {
Ok(()) => {
let _ = channels.state_tx.send(WsConnectionState::Disconnected);
backoff = Duration::from_secs(1);
}
Err(GrindrError::Auth(_)) => {
tracing::warn!("[ws] auth error, waiting for next login");
let _ = channels.state_tx.send(WsConnectionState::Disconnected);
backoff = Duration::from_secs(1);
}
Err(e) => {
tracing::warn!("[ws] connection error: {e}; retrying in {backoff:?}");
let _ = channels.state_tx.send(WsConnectionState::Disconnected);
if auth.session.read().await.is_none() {
backoff = Duration::from_secs(1);
continue;
}
sleep(backoff).await;
backoff = (backoff * 2).min(Duration::from_secs(30));
}
}
}
});
}
async fn connect_and_run(
inner: &InnerClient,
auth: &AuthState,
channels: &WsChannels,
cmd_rx: &mut mpsc::Receiver<WsCommand>,
logout_notify: &Notify,
) -> Result<(), GrindrError> {
let authorization = crate::auth::authorization_header(inner, auth)
.await
.ok_or_else(|| GrindrError::Auth("not logged in".to_owned()))?;
let session_id = auth
.session
.read()
.await
.as_ref()
.map(|s| s.session_id.clone())
.ok_or_else(|| GrindrError::Auth("not logged in".to_owned()))?;
let fp = inner.fingerprint().await;
let headers = GrindrHeaders::build(
&fp.device,
&fp.user_agent,
Some(&authorization),
Some("[FREE]"),
)?;
let mut builder = fp.ws_http.websocket(WS_URL);
for (name, value) in &headers.items {
builder = builder.header(name.clone(), value.clone());
}
let response = builder
.send()
.await
.map_err(|e| GrindrError::Http(format!("WS connect failed: {e}")))?;
let mut ws = response
.into_websocket()
.await
.map_err(|e| GrindrError::Http(format!("WS upgrade failed: {e}")))?;
let _ = channels.state_tx.send(WsConnectionState::Connected);
run_message_loop(
&mut ws,
cmd_rx,
&session_id,
&channels.event_tx,
logout_notify,
)
.await
}
async fn run_message_loop(
ws: &mut WebSocket,
cmd_rx: &mut mpsc::Receiver<WsCommand>,
session_id: &str,
event_tx: &broadcast::Sender<WsEvent>,
logout_notify: &Notify,
) -> Result<(), GrindrError> {
let logged_out = logout_notify.notified();
tokio::pin!(logged_out);
loop {
tokio::select! {
_ = &mut logged_out => {
return Ok(());
}
msg = ws.next() => match msg {
Some(Ok(Message::Text(text))) => {
if let Ok(payload) = serde_json::from_str::<Value>(text.as_str()) {
if let Some(event_type) = payload["type"].as_str() {
let _ = event_tx.send(WsEvent {
event_type: event_type.to_owned(),
payload,
});
}
}
}
Some(Ok(Message::Ping(data))) => {
ws.send(Message::Pong(data))
.await
.map_err(|e| GrindrError::Http(e.to_string()))?;
}
Some(Ok(Message::Close(_))) | None => {
return Err(GrindrError::Http("WS connection closed by server".to_owned()));
}
Some(Err(e)) => {
return Err(GrindrError::Http(e.to_string()));
}
Some(Ok(_)) => {}
},
cmd = cmd_rx.recv() => match cmd {
Some(cmd) => {
let json = serde_json::json!({
"type": cmd.r#type,
"ref": cmd.ref_id,
"token": session_id,
"payload": cmd.payload,
});
ws.send(Message::text(json.to_string()))
.await
.map_err(|e| GrindrError::Http(e.to_string()))?;
}
None => return Ok(()),
}
}
}
}