#![forbid(unsafe_code)]
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Duration;
use anyhow::Result;
use tear_client::Transport;
use tracing::{debug, error, info, warn};
use tungstenite::Message;
#[derive(Clone, Debug)]
pub struct BridgeConfig {
pub listen: SocketAddr,
pub tear: Transport,
}
pub struct BridgeHandle {
stop: Arc<AtomicBool>,
accept: Option<thread::JoinHandle<()>>,
pub listen: SocketAddr,
}
impl BridgeHandle {
pub fn stop(mut self) {
self.signal_and_join();
}
fn signal_and_join(&mut self) {
self.stop.store(true, Ordering::SeqCst);
if let Some(j) = self.accept.take() {
let _ = j.join();
}
}
}
impl Drop for BridgeHandle {
fn drop(&mut self) {
if self.accept.is_some() {
self.signal_and_join();
}
}
}
pub fn start(cfg: BridgeConfig) -> Result<BridgeHandle> {
let listener = TcpListener::bind(cfg.listen)?;
let bound = listener.local_addr()?;
listener.set_nonblocking(true)?;
info!(
listen = %bound,
tear = %cfg.tear.display_string(),
"tear-ws-bridge listening"
);
let stop = Arc::new(AtomicBool::new(false));
let stop_for_accept = stop.clone();
let tear_for_accept = cfg.tear.clone();
let accept = thread::Builder::new()
.name("tear-ws-bridge-accept".into())
.spawn(move || accept_loop(listener, stop_for_accept, tear_for_accept))?;
Ok(BridgeHandle {
stop,
accept: Some(accept),
listen: bound,
})
}
fn accept_loop(listener: TcpListener, stop: Arc<AtomicBool>, tear: Transport) {
loop {
if stop.load(Ordering::SeqCst) {
debug!("ws-bridge accept loop: stop requested");
return;
}
match listener.accept() {
Ok((stream, peer)) => {
debug!(peer = %peer, "ws connection accepted");
let tear_for_conn = tear.clone();
let _ = thread::Builder::new()
.name("tear-ws-bridge-conn".into())
.spawn(move || {
let _ = stream.set_nonblocking(false);
if let Err(e) = serve_ws_connection(stream, tear_for_conn) {
warn!(error = %e, "ws connection ended");
}
});
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::thread::sleep(Duration::from_millis(50));
}
Err(e) => {
error!(error = %e, "ws accept failed");
std::thread::sleep(Duration::from_millis(50));
}
}
}
}
fn serve_ws_connection(tcp: TcpStream, tear: Transport) -> Result<()> {
let ws = tungstenite::accept(tcp)?;
let backend = match &tear {
Transport::Unix(p) => TearStream::Unix(std::os::unix::net::UnixStream::connect(p)?),
Transport::Tcp(addr) => TearStream::Tcp(TcpStream::connect(addr)?),
};
pump(ws, backend)
}
enum TearStream {
Unix(std::os::unix::net::UnixStream),
Tcp(TcpStream),
}
impl TearStream {
fn try_clone(&self) -> std::io::Result<Self> {
match self {
TearStream::Unix(s) => s.try_clone().map(TearStream::Unix),
TearStream::Tcp(s) => s.try_clone().map(TearStream::Tcp),
}
}
}
impl Read for TearStream {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match self {
TearStream::Unix(s) => s.read(buf),
TearStream::Tcp(s) => s.read(buf),
}
}
}
impl Write for TearStream {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
match self {
TearStream::Unix(s) => s.write(buf),
TearStream::Tcp(s) => s.write(buf),
}
}
fn flush(&mut self) -> std::io::Result<()> {
match self {
TearStream::Unix(s) => s.flush(),
TearStream::Tcp(s) => s.flush(),
}
}
}
fn pump(
mut ws: tungstenite::WebSocket<TcpStream>,
mut backend: TearStream,
) -> Result<()> {
let backend_for_reader = backend
.try_clone()
.map_err(|e| anyhow::anyhow!("clone backend: {e}"))?;
let ws_writer = ws.get_mut().try_clone()?;
let _reader = thread::Builder::new()
.name("tear-ws-bridge-daemon-reader".into())
.spawn(move || {
let mut br = std::io::BufReader::new(backend_for_reader);
let mut writer = tungstenite::WebSocket::from_raw_socket(
ws_writer,
tungstenite::protocol::Role::Server,
None,
);
loop {
let mut len_bytes = [0u8; 4];
if br.read_exact(&mut len_bytes).is_err() {
let _ = writer.close(None);
return;
}
let len = u32::from_be_bytes(len_bytes) as usize;
if len > tear_types::wire::MAX_FRAME_BYTES {
warn!(len, "ws-bridge daemon→ws: frame too large, closing");
let _ = writer.close(None);
return;
}
let mut payload = vec![0u8; len];
if br.read_exact(&mut payload).is_err() {
let _ = writer.close(None);
return;
}
let mut framed = Vec::with_capacity(4 + len);
framed.extend_from_slice(&len_bytes);
framed.extend_from_slice(&payload);
if writer.send(Message::Binary(framed.into())).is_err() {
return;
}
}
})?;
loop {
match ws.read() {
Ok(Message::Binary(bytes)) => {
if backend.write_all(&bytes).is_err() {
return Ok(());
}
if backend.flush().is_err() {
return Ok(());
}
}
Ok(Message::Close(_)) => return Ok(()),
Ok(_) => continue,
Err(_) => return Ok(()),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn end_to_end_ws_bridge_pipes_request_response() {
use std::net::TcpStream as StdTcpStream;
use tear_types::wire::{read_msg, write_msg, Request, Response};
let sock = {
let mut p = std::env::temp_dir();
let pid = std::process::id();
p.push(format!("tear-ws-bridge-test-{pid}.sock"));
p
};
let _ = std::fs::remove_file(&sock);
let inproc = std::sync::Arc::new(tear_core::InProcess::new());
let daemon = tear_daemon::start(sock.clone(), inproc).expect("daemon");
std::thread::sleep(Duration::from_millis(50));
let bridge = start(BridgeConfig {
listen: "127.0.0.1:0".parse().unwrap(),
tear: Transport::Unix(sock.clone()),
})
.expect("bridge");
std::thread::sleep(Duration::from_millis(50));
let url = format!("ws://{}/", bridge.listen);
let stream = StdTcpStream::connect(bridge.listen).expect("connect");
let (mut ws, _resp) = tungstenite::client(url, stream).expect("ws client");
let mut req_bytes = Vec::new();
write_msg(&mut req_bytes, &Request::ListSessions).unwrap();
ws.send(Message::Binary(req_bytes.into())).unwrap();
let msg = ws.read().expect("read");
match msg {
Message::Binary(b) => {
let mut cursor = std::io::Cursor::new(b.as_ref());
let resp: Response = read_msg(&mut cursor).expect("decode");
match resp {
Response::Sessions(s) => {
assert_eq!(s.len(), 0, "no sessions on a fresh daemon");
}
other => panic!("unexpected response: {other:?}"),
}
}
other => panic!("expected binary frame, got {other:?}"),
}
ws.close(None).ok();
bridge.stop();
daemon.stop();
}
}