use super::commands::Command;
use super::events::Event;
use super::options::SoopChatOptions;
use crate::SoopHttpClient;
use crate::chat::commands::MessageType;
use crate::chat::formatter::ChatFormatter;
use crate::chat::message::MessageHandler;
use crate::chat::verification::NoVerification;
use crate::error::{Error, Result};
use crate::models::LiveDetail;
use futures_util::lock::Mutex;
use futures_util::stream::SplitStream;
use futures_util::{SinkExt, StreamExt, stream::SplitSink};
use reqwest::header::HeaderValue;
use rustls::ClientConfig;
use rustls::crypto::CryptoProvider;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::sync::{broadcast, mpsc};
use tokio_tungstenite::connect_async_tls_with_config;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, tungstenite::protocol::Message};
use url::Url;
pub struct SoopChatConnection {
client: Arc<SoopHttpClient>, command_tx: mpsc::Sender<Command>, command_rx: Mutex<Option<mpsc::Receiver<Command>>>, event_tx: broadcast::Sender<Event>, options: SoopChatOptions, }
struct ConnectionLoopState {
command_rx: mpsc::Receiver<Command>,
command_tx: mpsc::Sender<Command>,
event_tx: broadcast::Sender<Event>,
connection_url: String,
live_detail: LiveDetail,
password: String,
}
impl SoopChatConnection {
pub fn new(soop_http_client: Arc<SoopHttpClient>, options: SoopChatOptions) -> Result<Self> {
if CryptoProvider::get_default().is_none() {
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.expect("Failed to install default crypto provider");
}
let (command_tx, command_rx) = mpsc::channel(32);
let (event_tx, _) = broadcast::channel(1024);
Ok(Self {
command_tx,
command_rx: Mutex::new(Some(command_rx)),
event_tx,
client: soop_http_client,
options,
})
}
pub fn command(&self, command: Command) -> Result<()> {
self.command_tx
.try_send(command)
.map_err(|e| Error::InternalChannel(e.to_string()))
}
fn make_connection_url(&self, live_detail: &LiveDetail) -> String {
format!(
"wss://{}:{}/Websocket/{}",
live_detail.ch_domain.to_lowercase(),
live_detail.ch_pt + 1,
self.options.streamer_id
)
}
pub async fn start(&self) -> Result<()> {
let (is_live, optional_live_detail) = self
.client
.get_live_detail_state(&self.options.streamer_id)
.await?;
if !is_live {
return Err(Error::StreamOffline);
} else if optional_live_detail.is_none() {
return Err(Error::InternalChannel(
"생방송 정보가 잘못되었습니다.".to_string(),
));
}
let live_detail = optional_live_detail.unwrap();
let connection_url = self.make_connection_url(&live_detail);
let mut rx_guard = self.command_rx.lock().await;
if let Some(command_rx) = rx_guard.take() {
let loop_state = ConnectionLoopState {
command_tx: self.command_tx.clone(),
command_rx,
event_tx: self.event_tx.clone(),
connection_url,
live_detail,
password: self.options.password.clone(),
};
tokio::spawn(run_connection_loop(loop_state));
Ok(())
} else {
Err(Error::AlreadyStarted)
}
}
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.event_tx.subscribe()
}
}
async fn run_connection_loop(mut state: ConnectionLoopState) {
match try_connect_and_run_session(&mut state).await {
Ok(_) => {
state.event_tx.send(Event::Disconnected).ok();
}
Err(e) => {
print!("[System] Connection error: {:?}. Retrying...\n", e);
}
}
}
async fn try_connect_and_run_session(state: &mut ConnectionLoopState) -> Result<()> {
let url = Url::parse(&state.connection_url)?;
print!("[System] Attempting to connect to WebSocket: {}\n", url);
let mut request = url.into_client_request()?;
request
.headers_mut()
.insert("Sec-WebSocket-Protocol", HeaderValue::from_static("chat"));
let config = ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(Arc::new(NoVerification)) .with_no_client_auth();
let (ws_stream, _) = connect_async_tls_with_config(
request,
None,
true,
Some(tokio_tungstenite::Connector::Rustls(Arc::new(config))),
)
.await
.map_err(|e| Error::ConnectionFailed(e.to_string()))?;
state
.event_tx
.send(Event::Connected)
.map_err(|_| Error::InternalChannel("Event channel closed".into()))?;
let (mut writer, mut reader) = ws_stream.split();
let formatter = ChatFormatter::new(state.live_detail.clone(), state.password.clone());
let connect_packet = formatter.format_message(MessageType::Connect);
writer.send(Message::Binary(connect_packet)).await?;
run_communication_loop(state, &mut reader, &mut writer, &formatter).await
}
async fn run_communication_loop(
state: &mut ConnectionLoopState,
reader: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
writer: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
formatter: &ChatFormatter,
) -> Result<()> {
let mut ping_interval = tokio::time::interval(Duration::from_secs(60));
let handler = MessageHandler::new(formatter, state.event_tx.clone(), state.command_tx.clone());
loop {
tokio::select! {
Some(msg_result) = reader.next() => {
let raw = msg_result?.into_data(); if let Some(resp) = handler.handle(raw)? {
writer.send(Message::Binary(resp)).await?;
}
},
Some(command) = state.command_rx.recv() => {
match command {
Command::Shutdown => {
return Ok(());
}
_ => {
}
}
},
_ = ping_interval.tick() => {
let msg = formatter.format_message(MessageType::Ping);
writer.send(Message::Binary(msg)).await?;
}
}
}
}