use std::net::IpAddr;
use std::sync::Arc;
use std::time::Duration;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64;
use futures_util::{SinkExt, StreamExt};
use tokio::sync::Mutex;
use crate::Error;
use crate::frame_tv_transport::FrameTvTransport;
use crate::protocol::art_response::ArtResponse;
use crate::protocol::event;
use crate::protocol::ws_message::{WsIncoming, WsOutgoing};
use crate::transport::tls_config;
use crate::transport::ws_transport;
const WS_PORT: u16 = 8002;
const REMOTE_ENDPOINT: &str = "samsung.remote.control";
const ART_ENDPOINT: &str = "com.samsung.art-app";
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
type WsStream =
tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
struct ArtConnection {
write: futures_util::stream::SplitSink<WsStream, tokio_tungstenite::tungstenite::Message>,
read: futures_util::stream::SplitStream<WsStream>,
}
struct RemoteConnection {
write: futures_util::stream::SplitSink<WsStream, tokio_tungstenite::tungstenite::Message>,
#[allow(dead_code)]
read: futures_util::stream::SplitStream<WsStream>,
}
pub(crate) struct PersistentWsTransport {
host: IpAddr,
token: Arc<Mutex<Option<String>>>,
timeout: Duration,
art_conn: Mutex<Option<ArtConnection>>,
remote_conn: Mutex<Option<RemoteConnection>>,
}
impl PersistentWsTransport {
pub(crate) async fn connect(
host: IpAddr,
token: Option<String>,
timeout: Duration,
) -> Result<Self, Error> {
let token = Arc::new(Mutex::new(token));
let ws_url = build_ws_url(host, REMOTE_ENDPOINT, &*token.lock().await);
let connector = tls_config::make_tls_connector();
let (ws_stream, _) = tokio::time::timeout(timeout, async {
tokio_tungstenite::connect_async_tls_with_config(&ws_url, None, false, Some(connector))
.await
})
.await
.map_err(|_| Error::Timeout(timeout))?
.map_err(|e| Error::WebSocket(Box::new(e)))?;
let (_write, mut read) = ws_stream.split();
if let Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text))) = read.next().await
&& let Ok(incoming) = serde_json::from_str::<WsIncoming>(&text)
{
match incoming.event.as_deref() {
Some(event::CHANNEL_UNAUTHORIZED) => {
return Err(Error::Unauthorized);
}
Some(event::CHANNEL_TIMEOUT) => {
return Err(Error::ConnectionFailed(
"TV rejected the connection (ms.channel.timeOut). \
Troubleshooting steps:\n\
1. Your device must be on the same local subnet as the TV \
(e.g. both on 192.168.1.x). The TV rejects WebSocket \
connections from other subnets entirely.\n\
2. Ensure IP Remote is enabled on the TV: \
Settings → Connection → Network → Expert Settings → IP Remote\n\
3. For first-time pairing, the TV screen must be on (not in \
standby or art mode) so the approval popup can appear."
.into(),
));
}
Some(event::CHANNEL_CONNECT) => {
if let Some(data) = &incoming.data
&& let Some(t) = data.get("token").and_then(|v| v.as_str())
{
*token.lock().await = Some(t.to_string());
}
}
_ => {}
}
}
Ok(Self {
host,
token,
timeout,
art_conn: Mutex::new(None),
remote_conn: Mutex::new(None),
})
}
pub(crate) async fn get_token(&self) -> Option<String> {
self.token.lock().await.clone()
}
pub(crate) async fn reconnect(&self) -> Result<(), Error> {
*self.art_conn.lock().await = None;
*self.remote_conn.lock().await = None;
let ws_url = build_ws_url(self.host, REMOTE_ENDPOINT, &*self.token.lock().await);
let connector = tls_config::make_tls_connector();
let (ws_stream, _) = tokio::time::timeout(self.timeout, async {
tokio_tungstenite::connect_async_tls_with_config(&ws_url, None, false, Some(connector))
.await
})
.await
.map_err(|_| Error::Timeout(self.timeout))?
.map_err(|e| Error::WebSocket(Box::new(e)))?;
let (_write, mut read) = ws_stream.split();
if let Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text))) = read.next().await
&& let Ok(incoming) = serde_json::from_str::<WsIncoming>(&text)
{
match incoming.event.as_deref() {
Some(event::CHANNEL_UNAUTHORIZED) => return Err(Error::Unauthorized),
Some(event::CHANNEL_TIMEOUT) => {
return Err(Error::ConnectionFailed("TV rejected reconnection".into()));
}
Some(event::CHANNEL_CONNECT) => {
if let Some(data) = &incoming.data
&& let Some(t) = data.get("token").and_then(|v| v.as_str())
{
*self.token.lock().await = Some(t.to_string());
}
}
_ => {}
}
}
Ok(())
}
pub(crate) async fn is_connected(&self) -> bool {
self.art_conn.lock().await.is_some() || self.remote_conn.lock().await.is_some()
}
pub(crate) async fn send_keepalive(&self) {
Self::keepalive_conn(&self.art_conn, "art").await;
Self::keepalive_conn_remote(&self.remote_conn, "remote").await;
}
async fn keepalive_conn(conn: &Mutex<Option<ArtConnection>>, label: &str) {
let mut guard = conn.lock().await;
let Some(c) = guard.as_mut() else { return };
if let Err(e) = c
.write
.send(tokio_tungstenite::tungstenite::Message::Ping(vec![].into()))
.await
{
tracing::warn!("{label} keepalive: ping send failed: {e}, dropping connection");
*guard = None;
return;
}
loop {
match tokio::time::timeout(Duration::from_millis(100), c.read.next()).await {
Ok(Some(Ok(_))) => continue, Ok(Some(Err(e))) => {
tracing::warn!("{label} keepalive: read error: {e}, dropping connection");
*guard = None;
return;
}
Ok(None) => {
tracing::warn!("{label} keepalive: connection closed by peer");
*guard = None;
return;
}
Err(_) => break, }
}
tracing::debug!("{label} keepalive: ok");
}
async fn keepalive_conn_remote(conn: &Mutex<Option<RemoteConnection>>, label: &str) {
let mut guard = conn.lock().await;
let Some(c) = guard.as_mut() else { return };
if let Err(e) = c
.write
.send(tokio_tungstenite::tungstenite::Message::Ping(vec![].into()))
.await
{
tracing::warn!("{label} keepalive: ping send failed: {e}, dropping connection");
*guard = None;
return;
}
loop {
match tokio::time::timeout(Duration::from_millis(100), c.read.next()).await {
Ok(Some(Ok(_))) => continue,
Ok(Some(Err(e))) => {
tracing::warn!("{label} keepalive: read error: {e}, dropping connection");
*guard = None;
return;
}
Ok(None) => {
tracing::warn!("{label} keepalive: connection closed by peer");
*guard = None;
return;
}
Err(_) => break,
}
}
tracing::debug!("{label} keepalive: ok");
}
async fn ensure_art_connection(&self) -> Result<(), Error> {
let mut guard = self.art_conn.lock().await;
if guard.is_some() {
return Ok(());
}
let ws = open_ws(
self.host,
ART_ENDPOINT,
&*self.token.lock().await,
self.timeout,
)
.await?;
let (write, mut read) = ws.split();
wait_for_art_ready(&mut read).await?;
*guard = Some(ArtConnection { write, read });
Ok(())
}
async fn ensure_remote_connection(&self) -> Result<(), Error> {
let mut guard = self.remote_conn.lock().await;
if guard.is_some() {
return Ok(());
}
let ws = open_ws(
self.host,
REMOTE_ENDPOINT,
&*self.token.lock().await,
self.timeout,
)
.await?;
let (write, mut read) = ws.split();
if let Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text))) = read.next().await
&& let Ok(msg) = serde_json::from_str::<WsIncoming>(&text)
&& msg.event.as_deref() == Some(event::CHANNEL_UNAUTHORIZED)
{
return Err(Error::Unauthorized);
}
*guard = Some(RemoteConnection { write, read });
Ok(())
}
fn invalidate_art_conn(guard: &mut Option<ArtConnection>) {
*guard = None;
}
fn invalidate_remote_conn(guard: &mut Option<RemoteConnection>) {
*guard = None;
}
}
impl FrameTvTransport for PersistentWsTransport {
async fn send_art_request(
&self,
request_json: &str,
wait_for_event: Option<&str>,
) -> Result<ArtResponse, Error> {
self.ensure_art_connection().await?;
let mut guard = self.art_conn.lock().await;
let conn = guard.as_mut().unwrap();
let outgoing = WsOutgoing::art_request(request_json);
let msg_text = serde_json::to_string(&outgoing)?;
let send_result = conn
.write
.send(tokio_tungstenite::tungstenite::Message::Text(
msg_text.clone().into(),
))
.await;
if let Err(_e) = send_result {
Self::invalidate_art_conn(&mut guard);
drop(guard);
self.ensure_art_connection().await?;
let mut guard = self.art_conn.lock().await;
let conn = guard.as_mut().unwrap();
conn.write
.send(tokio_tungstenite::tungstenite::Message::Text(
msg_text.into(),
))
.await
.map_err(|e| Error::WebSocket(Box::new(e)))?;
let result = read_d2d_response(&mut conn.read, wait_for_event).await;
if result.is_err() {
Self::invalidate_art_conn(&mut guard);
}
return result;
}
let result = read_d2d_response(&mut guard.as_mut().unwrap().read, wait_for_event).await;
if result.is_err() {
Self::invalidate_art_conn(&mut guard);
}
result
}
async fn send_remote_key(&self, key_code: &str) -> Result<(), Error> {
self.ensure_remote_connection().await?;
let mut guard = self.remote_conn.lock().await;
let conn = guard.as_mut().unwrap();
let outgoing = WsOutgoing::remote_key_click(key_code);
let msg_text = serde_json::to_string(&outgoing)?;
let send_result = conn
.write
.send(tokio_tungstenite::tungstenite::Message::Text(
msg_text.clone().into(),
))
.await;
if let Err(_e) = send_result {
Self::invalidate_remote_conn(&mut guard);
drop(guard);
self.ensure_remote_connection().await?;
let mut guard = self.remote_conn.lock().await;
let conn = guard.as_mut().unwrap();
conn.write
.send(tokio_tungstenite::tungstenite::Message::Text(
msg_text.into(),
))
.await
.map_err(|e| Error::WebSocket(Box::new(e)))?;
return Ok(());
}
Ok(())
}
async fn upload_image(
&self,
image_data: &[u8],
file_type: &str,
matte_id: Option<&str>,
) -> Result<ArtResponse, Error> {
let oneshot = ws_transport::WsTransport::connect_from_token(
self.host,
self.token.lock().await.clone(),
);
oneshot.upload_image(image_data, file_type, matte_id).await
}
async fn download_thumbnail(&self, content_id: &str) -> Result<Vec<u8>, Error> {
let oneshot = ws_transport::WsTransport::connect_from_token(
self.host,
self.token.lock().await.clone(),
);
oneshot.download_thumbnail(content_id).await
}
async fn tv_display_size(&self) -> Result<(u32, u32), Error> {
Ok((3840, 2160))
}
}
async fn open_ws(
host: IpAddr,
endpoint: &str,
token: &Option<String>,
timeout: Duration,
) -> Result<WsStream, Error> {
let ws_url = build_ws_url(host, endpoint, token);
let connector = tls_config::make_tls_connector();
let (ws_stream, _) = tokio::time::timeout(timeout, async {
tokio_tungstenite::connect_async_tls_with_config(&ws_url, None, false, Some(connector))
.await
})
.await
.map_err(|_| Error::Timeout(timeout))?
.map_err(|e| Error::WebSocket(Box::new(e)))?;
Ok(ws_stream)
}
fn build_ws_url(host: IpAddr, endpoint: &str, token: &Option<String>) -> String {
let name = BASE64.encode("framesmith");
let mut url = format!("wss://{host}:{WS_PORT}/api/v2/channels/{endpoint}?name={name}");
if let Some(token) = token {
url.push_str(&format!("&token={token}"));
}
url
}
async fn wait_for_art_ready(
read: &mut futures_util::stream::SplitStream<WsStream>,
) -> Result<(), Error> {
loop {
match tokio::time::timeout(Duration::from_secs(5), read.next()).await {
Ok(Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text)))) => {
if let Ok(msg) = serde_json::from_str::<WsIncoming>(&text) {
match msg.event.as_deref() {
Some(event::CHANNEL_READY) => return Ok(()),
Some(event::CHANNEL_CONNECT) => continue,
Some(event::CHANNEL_UNAUTHORIZED) => return Err(Error::Unauthorized),
other => {
tracing::debug!("art ready wait: ignoring event {:?}", other);
continue;
}
}
}
}
Ok(Some(Ok(_))) => continue,
Ok(Some(Err(e))) => return Err(Error::WebSocket(Box::new(e))),
Ok(None) => return Err(Error::ConnectionFailed("art connection closed".into())),
Err(_) => {
return Err(Error::ConnectionFailed(
"Timed out waiting for art endpoint to become ready. \
The TV must be in art mode (not on the home screen or \
showing other content) for art commands to work."
.into(),
));
}
}
}
}
async fn read_d2d_response(
read: &mut futures_util::stream::SplitStream<WsStream>,
wait_for_event: Option<&str>,
) -> Result<ArtResponse, Error> {
loop {
match tokio::time::timeout(DEFAULT_TIMEOUT, read.next()).await {
Ok(Some(Ok(tokio_tungstenite::tungstenite::Message::Text(text)))) => {
tracing::debug!("d2d read: raw message: {text}");
if let Ok(msg) = serde_json::from_str::<WsIncoming>(&text)
&& msg.event.as_deref() == Some(event::D2D_SERVICE_MESSAGE)
&& let Some(data) = msg.data
{
let data_str = if data.is_string() {
data.as_str().unwrap().to_string()
} else {
data.to_string()
};
if let Ok(art_resp) = serde_json::from_str::<ArtResponse>(&data_str) {
if let Some(ref error_code) = art_resp.error_code {
return Err(Error::TvError {
request: "art_request".to_string(),
code: error_code.to_string(),
});
}
if let Some(wait_event) = wait_for_event {
if art_resp.event.as_deref() == Some(wait_event) {
return Ok(art_resp);
}
continue;
}
return Ok(art_resp);
}
}
}
Ok(Some(Ok(_))) => continue,
Ok(Some(Err(e))) => return Err(Error::WebSocket(Box::new(e))),
Ok(None) => return Err(Error::ConnectionFailed("connection closed".into())),
Err(_) => return Err(Error::Timeout(DEFAULT_TIMEOUT)),
}
}
}