use crate::bridge::fnc_frame_codec::{
BridgeCodec, Frame, FrameHeader, MsgKind, SessionInitPayload, FLAG_SIZE_EXCEEDED, FLAG_TOKEN_INVALID,
FLAG_TTL_EXPIRED,
};
use std::path::PathBuf;
use crate::bridge::fnc_clipboard_apply::{apply_session_to_clipboard_with, apply_warning_to_clipboard, BridgeClipboardOptions};
use crate::bridge::fnc_session_store::{BridgeSessionConfig, BridgeSessionId, SessionError, SessionStore, SessionStoreConfig};
use crate::config::{ResolvedBridgeConnect, ResolvedBridgeListen, parse_duration};
use crate::clipboard;
use anyhow::Result;
use bytes::Bytes;
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream};
use tokio::time::{timeout, Duration};
use std::time::Duration as StdDuration;
use tokio_util::codec::Framed;
use std::sync::{Arc, Mutex};
use std::thread;
use std::fs::OpenOptions;
use std::io::Write;
fn load_token(path: &Option<PathBuf>) -> Option<String> {
path.as_ref()
.and_then(|p| std::fs::read_to_string(p).ok())
.map(|s| s.trim().to_string())
}
fn build_summary(bytes: &[u8], content_type: &str) -> String {
let size_kb = bytes.len() as f64 / 1024.0;
let lines = if bytes.is_empty() { 0 } else { bytes.iter().filter(|&&b| b == b'\n').count() + 1 };
let preview_len = bytes.len().min(100);
let preview = String::from_utf8_lossy(&bytes[..preview_len]);
format!(
"[bridge] {} | {:.2} KB | {} lines | preview: {}",
content_type, size_kb, lines, preview
)
}
const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(5);
struct TcpListenerHandler<F>
where
F: Fn(&str) -> Result<()> + Send + Sync + 'static,
{
cfg: ResolvedBridgeListen,
set_clipboard: Arc<F>,
store: Arc<Mutex<SessionStore>>,
}
impl<F> TcpListenerHandler<F>
where
F: Fn(&str) -> Result<()> + Send + Sync + 'static,
{
fn new(cfg: ResolvedBridgeListen, set_clipboard: F) -> Self {
let ttl_ms: u64 = cfg
.ttl
.as_deref()
.and_then(parse_duration)
.unwrap_or_else(|| Duration::from_secs(2 * 60 * 60))
.as_millis() as u64;
let idle_ms: u64 = cfg
.idle_exit
.as_deref()
.and_then(parse_duration)
.unwrap_or_else(|| Duration::from_secs(2 * 60 * 60))
.as_millis() as u64;
let store_cfg = SessionStoreConfig {
default_max_bytes: cfg.max_bytes as u64,
default_ttl: StdDuration::from_millis(ttl_ms),
default_idle_exit: if idle_ms == 0 {
StdDuration::MAX
} else {
StdDuration::from_millis(idle_ms)
},
spill_threshold: cfg.max_bytes as u64 / 4, };
let store = SessionStore::new(store_cfg);
Self {
cfg,
set_clipboard: Arc::new(set_clipboard),
store,
}
}
async fn handle_handshake(
&self,
framed: &mut Framed<TcpStream, BridgeCodec>,
) -> Result<(HelloPayload, BridgeSessionId)> {
let hello_opt = timeout(HANDSHAKE_TIMEOUT, framed.next()).await?;
let hello_frame = match hello_opt {
Some(res) => res?,
None => anyhow::bail!("connection closed"),
};
let hello = deserialize_payload::<HelloPayload>(&hello_frame.payload)?;
let session_id = {
let mut s = self.store.lock().unwrap();
let cfg = BridgeSessionConfig {
max_bytes: s.config.default_max_bytes,
ttl: s.config.default_ttl,
idle_exit: s.config.default_idle_exit,
};
s.open_session(Some(cfg))
};
Ok((hello, session_id))
}
async fn run(self) -> Result<()> {
let listener = TcpListener::bind(&self.cfg.bind).await?;
let (stream, _) = listener.accept().await?;
let mut framed = Framed::new(stream, BridgeCodec::new(self.cfg.max_bytes as u64));
let (hello, session_id) = self.handle_handshake(&mut framed).await?;
if hello.max_bytes > self.cfg.max_bytes as u64 {
let payload = serialize_payload(&HelloAckPayload {
accepted: false,
reason: Some(format!("max_bytes exceeds listener cap {}", self.cfg.max_bytes)),
})?;
send_frame(&mut framed, MsgKind::HelloAck, 0, 0, payload).await?;
return Ok(());
}
let payload = serialize_payload(&HelloAckPayload {
accepted: true,
reason: None,
})?;
send_frame(&mut framed, MsgKind::HelloAck, 0, 0, payload).await?;
let init_frame = framed
.next()
.await
.transpose()?
.ok_or_else(|| anyhow::anyhow!("no session init"))?;
anyhow::ensure!(init_frame.header.kind == MsgKind::SessionInit, "expected SessionInit");
let init = deserialize_payload::<SessionInitPayload>(&init_frame.payload)?;
let required_token = self
.cfg
.token
.clone()
.or_else(|| load_token(&self.cfg.token_file));
if let Some(expected) = required_token {
match &init.token {
Some(t) if !t.is_empty() && t == &expected => {}
_ => {
{
let mut s = self.store.lock().unwrap();
s.record_abort_reason("token required or mismatch");
}
let payload = Bytes::from_static(b"token required or mismatch");
send_frame(
&mut framed,
MsgKind::SessionAbort,
init_frame.header.session_id,
FLAG_TOKEN_INVALID,
payload,
)
.await?;
return Ok(());
}
}
}
if init.total_len > self.cfg.max_bytes as u64 {
{
let mut s = self.store.lock().unwrap();
s.record_size_violation("session too large");
}
let payload = Bytes::from_static(b"session too large");
send_frame(
&mut framed,
MsgKind::SessionAbort,
init_frame.header.session_id,
FLAG_SIZE_EXCEEDED,
payload,
)
.await?;
return Ok(());
}
send_frame(&mut framed, MsgKind::Ack, init_frame.header.session_id, 0, Bytes::new()).await?;
if let Some(frame) = framed.next().await.transpose()? {
if frame.header.kind == MsgKind::Data {
let apply_res: Result<(), u16> = {
let mut s = self.store.lock().unwrap();
match s.put_chunk(session_id, &frame.payload) {
Ok(_) => {
let _ = s.finalize(session_id);
let res = apply_session_to_clipboard_with(
&mut s,
session_id,
&BridgeClipboardOptions::default(),
|text| (self.set_clipboard)(text),
);
if res.is_ok() {
s.mark_clipboard_applied(session_id);
println!(
"{}",
build_summary(&frame.payload, init.content_type.as_str())
);
}
res.map(|_| ()).map_err(|_| 0)
}
Err(SessionError::TooLarge { limit }) => {
s.record_size_violation("data chunk too large");
let warn = format!(
"wsl-clip: payload size exceeds configured max ({} bytes); nothing copied.",
limit
);
let _ = apply_warning_to_clipboard(&warn, |text| (self.set_clipboard)(text));
Err(FLAG_SIZE_EXCEEDED)
}
Err(SessionError::Expired) => {
s.record_ttl_violation("session expired");
let warn = "wsl-clip: TTL expired; re-approve with: wsl-clip bridge listen";
let _ = apply_warning_to_clipboard(warn, |text| (self.set_clipboard)(text));
Err(FLAG_TTL_EXPIRED)
}
Err(_) => Err(0),
}
};
match apply_res {
Ok(_) => {
let ack_payload = Bytes::from_static(b"ok");
send_frame(&mut framed, MsgKind::Ack, frame.header.session_id, 0, ack_payload).await?;
send_frame(&mut framed, MsgKind::SessionDone, frame.header.session_id, 0, Bytes::new())
.await?;
}
Err(flags) => {
let payload = Bytes::from_static(b"session error");
send_frame(&mut framed, MsgKind::SessionAbort, frame.header.session_id, flags, payload).await?;
}
}
}
}
Ok(())
}
}
struct TcpClientHandler {
cfg: ResolvedBridgeConnect,
}
#[derive(Debug)]
struct TcpClientPlan {
frames: Vec<Frame>,
}
impl TcpClientPlan {
fn new(cfg: &ResolvedBridgeConnect) -> Result<Self> {
let hello = HelloPayload {
max_bytes: cfg.max_bytes as u64,
supports_udp: cfg.mode == crate::config::BridgeMode::Udp,
token_present: cfg.token.is_some() || cfg.token_file.is_some(),
};
let hello_bytes = serialize_payload(&hello)?;
let hello_frame = Frame { header: FrameHeader::new(MsgKind::Hello, 0, 0, 0, hello_bytes.len() as u32), payload: hello_bytes };
let data = Bytes::from_static(b"ping");
let init = SessionInitPayload {
total_len: data.len() as u64,
chunk_bytes: data.len() as u32,
ttl_ms: 0,
idle_ms: 0,
content_type: "text/plain".into(),
meta: vec![],
token: cfg.token.clone().or_else(|| load_token(&cfg.token_file)),
};
let init_bytes = serialize_payload(&init)?;
let init_frame = Frame { header: FrameHeader::new(MsgKind::SessionInit, 0, 1, 0, init_bytes.len() as u32), payload: init_bytes };
let data_frame = Frame { header: FrameHeader::new(MsgKind::Data, 0, 1, 0, data.len() as u32), payload: data.clone() };
Ok(Self { frames: vec![hello_frame, init_frame, data_frame] })
}
async fn execute(self, framed: &mut Framed<TcpStream, BridgeCodec>) -> Result<()> {
for frame in self.frames {
framed.send(frame).await?;
}
Ok(())
}
}
impl TcpClientHandler {
fn new(cfg: ResolvedBridgeConnect) -> Self {
Self { cfg }
}
#[allow(dead_code)]
async fn enqueue_plan(&self, framed: &mut Framed<TcpStream, BridgeCodec>) -> Result<()> {
let plan = TcpClientPlan::new(&self.cfg)?;
plan.execute(framed).await
}
async fn run(self) -> Result<()> {
let cfg = self.cfg.clone();
let stream = TcpStream::connect(&cfg.to).await?;
let mut framed = Framed::new(stream, BridgeCodec::new(cfg.max_bytes as u64));
let plan = TcpClientPlan::new(&cfg)?;
plan.execute(&mut framed).await?;
let ack_opt = timeout(HANDSHAKE_TIMEOUT, framed.next()).await?;
let ack_frame = match ack_opt {
Some(res) => res?,
None => anyhow::bail!("connection closed"),
};
let ack = deserialize_payload::<HelloAckPayload>(&ack_frame.payload)?;
if !ack.accepted {
anyhow::bail!("listener rejected: {:?}", ack.reason);
}
let init_ack = framed
.next()
.await
.transpose()?
.ok_or_else(|| anyhow::anyhow!("no session init ack"))?;
anyhow::ensure!(init_ack.header.kind == MsgKind::Ack, "expected Ack to SessionInit");
let resp_opt = timeout(HANDSHAKE_TIMEOUT, framed.next()).await?;
match resp_opt {
Some(res) => {
let f = res?;
if f.header.kind == MsgKind::SessionAbort {
anyhow::bail!("server reported session error");
}
if f.header.kind == MsgKind::Ack {
if let Some(done) = framed.next().await.transpose()? {
anyhow::ensure!(done.header.kind == MsgKind::SessionDone, "expected SessionDone");
}
}
}
None => anyhow::bail!("connection closed"),
};
Ok(())
}
#[allow(dead_code)]
async fn enqueue_and_send(self) -> Result<()> {
self.run().await
}
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct HelloPayload {
pub max_bytes: u64,
pub supports_udp: bool,
pub token_present: bool,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct HelloAckPayload {
pub accepted: bool,
pub reason: Option<String>,
}
fn serialize_payload<T: Serialize>(value: &T) -> Result<Bytes> {
let bytes = bincode::serialize(value)?;
Ok(Bytes::from(bytes))
}
fn deserialize_payload<T: for<'de> Deserialize<'de>>(bytes: &Bytes) -> Result<T> {
Ok(bincode::deserialize(bytes)?)
}
async fn send_frame(
stream: &mut Framed<TcpStream, BridgeCodec>,
kind: MsgKind,
session: u64,
flags: u16,
payload: Bytes,
) -> Result<()> {
let header = FrameHeader::new(kind, flags, session, 0, payload.len() as u32);
let frame = Frame { header, payload };
stream.send(frame).await?;
Ok(())
}
pub async fn run_tcp_listener_stub(cfg: ResolvedBridgeListen) -> Result<()> {
run_tcp_listener_stub_with_clipboard(cfg, clipboard::set_text_content).await
}
pub fn spawn_tcp_listener_daemon(cfg: ResolvedBridgeListen) -> thread::JoinHandle<()> {
let log_path = cfg.log_file.clone();
let bind = cfg.bind.clone();
thread::spawn(move || {
if let Some(path) = log_path.as_ref() {
let _ = append_log(path, &format!("level=INFO event=daemon_start mode=tcp bind={}", bind));
}
let rt = match tokio::runtime::Runtime::new() {
Ok(rt) => rt,
Err(e) => {
if let Some(path) = log_path.as_ref() {
let _ = append_log(path, &format!("level=ERROR event=daemon_runtime_failed mode=tcp bind={} error={}", bind, e));
}
return;
}
};
match rt.block_on(run_tcp_listener_stub(cfg)) {
Ok(()) => {
if let Some(path) = log_path.as_ref() {
let _ = append_log(path, &format!("level=INFO event=daemon_exit mode=tcp bind={} result=ok", bind));
}
}
Err(e) => {
if let Some(path) = log_path.as_ref() {
let _ = append_log(path, &format!("level=ERROR event=daemon_exit mode=tcp bind={} error={}", bind, e));
}
}
}
})
}
fn append_log(path: &std::path::Path, line: &str) -> std::io::Result<()> {
let mut f = OpenOptions::new().create(true).append(true).open(path)?;
writeln!(f, "{}", line)
}
pub async fn run_tcp_listener_stub_with_clipboard<F>(
cfg: ResolvedBridgeListen,
set_clipboard: F,
) -> Result<()>
where
F: Fn(&str) -> Result<()> + Send + Sync + 'static,
{
TcpListenerHandler::new(cfg, set_clipboard).run().await
}
pub async fn run_tcp_client_stub(cfg: ResolvedBridgeConnect) -> Result<()> {
TcpClientHandler::new(cfg).run().await
}
#[allow(dead_code)]
pub(crate) fn spawn_tcp_client_plan(cfg: ResolvedBridgeConnect) -> tokio::task::JoinHandle<Result<()>> {
tokio::spawn(async move { TcpClientHandler::new(cfg).run().await })
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::task;
use tokio::time::sleep;
use std::sync::{Arc, Mutex};
use std::time::Duration as StdDuration;
use tempfile::NamedTempFile;
use std::fs;
#[tokio::test]
async fn handshake_and_ack_round_trip() {
let listener_cfg = ResolvedBridgeListen {
bind: "127.0.0.1:0".to_string(),
mode: crate::config::BridgeMode::Tcp,
idle_exit: Some("2h".into()),
ttl: None,
max_bytes: 1024 * 1024,
token: None,
token_file: None,
daemon: false,
log_file: None,
};
let listener = TcpListener::bind(&listener_cfg.bind).await.unwrap();
let addr = listener.local_addr().unwrap();
let server = task::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let mut framed = Framed::new(stream, BridgeCodec::new(listener_cfg.max_bytes as u64));
let hello_frame = framed.next().await.transpose().unwrap().unwrap();
let hello = deserialize_payload::<HelloPayload>(&hello_frame.payload).unwrap();
assert!(hello.max_bytes <= listener_cfg.max_bytes as u64);
let ack = HelloAckPayload { accepted: true, reason: None };
let payload = serialize_payload(&ack).unwrap();
send_frame(&mut framed, MsgKind::HelloAck, 0, 0, payload).await.unwrap();
let init = framed.next().await.transpose().unwrap().unwrap();
assert_eq!(init.header.kind, MsgKind::SessionInit);
let init_payload: SessionInitPayload = deserialize_payload(&init.payload).unwrap();
assert!(init_payload.total_len > 0);
send_frame(&mut framed, MsgKind::Ack, init.header.session_id, 0, Bytes::new())
.await
.unwrap();
let data = framed.next().await.transpose().unwrap().unwrap();
assert_eq!(data.header.kind, MsgKind::Data);
let ack_payload = Bytes::from_static(b"ok");
send_frame(&mut framed, MsgKind::Ack, data.header.session_id, 0, ack_payload)
.await
.unwrap();
send_frame(&mut framed, MsgKind::SessionDone, data.header.session_id, 0, Bytes::new())
.await
.unwrap();
});
let client_cfg = ResolvedBridgeConnect {
to: addr.to_string(),
socket: std::path::PathBuf::from("/run/wsl-clip/socket"),
mode: crate::config::BridgeMode::Tcp,
token: None,
token_file: None,
max_bytes: 1024 * 1024,
};
run_tcp_client_stub(client_cfg).await.unwrap();
server.await.unwrap();
}
#[tokio::test]
async fn spawn_tcp_client_plan_runs() {
let listener_cfg = ResolvedBridgeListen {
bind: "127.0.0.1:0".to_string(),
mode: crate::config::BridgeMode::Tcp,
idle_exit: Some("2h".into()),
ttl: None,
max_bytes: 1024 * 1024,
token: None,
token_file: None,
daemon: false,
log_file: None,
};
let listener = TcpListener::bind(&listener_cfg.bind).await.unwrap();
let addr = listener.local_addr().unwrap();
let server = task::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let mut framed = Framed::new(stream, BridgeCodec::new(listener_cfg.max_bytes as u64));
let hello_frame = framed.next().await.transpose().unwrap().unwrap();
let _hello: HelloPayload = deserialize_payload(&hello_frame.payload).unwrap();
let ack = HelloAckPayload { accepted: true, reason: None };
let payload = serialize_payload(&ack).unwrap();
send_frame(&mut framed, MsgKind::HelloAck, 0, 0, payload).await.unwrap();
let init = framed.next().await.transpose().unwrap().unwrap();
assert_eq!(init.header.kind, MsgKind::SessionInit);
send_frame(&mut framed, MsgKind::Ack, init.header.session_id, 0, Bytes::new()).await.unwrap();
let data = framed.next().await.transpose().unwrap().unwrap();
assert_eq!(data.header.kind, MsgKind::Data);
send_frame(&mut framed, MsgKind::Ack, data.header.session_id, 0, Bytes::new())
.await
.unwrap();
send_frame(&mut framed, MsgKind::SessionDone, data.header.session_id, 0, Bytes::new())
.await
.unwrap();
});
let client_cfg = ResolvedBridgeConnect {
to: addr.to_string(),
socket: std::path::PathBuf::from("/run/wsl-clip/socket"),
mode: crate::config::BridgeMode::Tcp,
token: None,
token_file: None,
max_bytes: 1024 * 1024,
};
let handle = spawn_tcp_client_plan(client_cfg);
handle.await.unwrap().unwrap();
server.await.unwrap();
}
#[tokio::test]
async fn listener_applies_clipboard_and_acks() {
let tmp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = tmp_listener.local_addr().unwrap().port();
drop(tmp_listener);
let bind = format!("127.0.0.1:{port}");
let listener_cfg = ResolvedBridgeListen {
bind: bind.clone(),
mode: crate::config::BridgeMode::Tcp,
idle_exit: Some("2h".into()),
ttl: None,
max_bytes: 1024 * 1024,
token: None,
token_file: None,
daemon: false,
log_file: None,
};
let seen = Arc::new(Mutex::new(None::<String>));
let seen_clone = seen.clone();
let server = task::spawn(async move {
run_tcp_listener_stub_with_clipboard(listener_cfg, move |text| {
*seen_clone.lock().unwrap() = Some(text.to_string());
Ok(())
})
.await
});
sleep(StdDuration::from_millis(50)).await;
let stream = TcpStream::connect(bind).await.unwrap();
let mut framed = Framed::new(stream, BridgeCodec::new(1024 * 1024));
let hello = HelloPayload {
max_bytes: 1024 * 1024,
supports_udp: false,
token_present: false,
};
let payload = serialize_payload(&hello).unwrap();
send_frame(&mut framed, MsgKind::Hello, 0, 0, payload).await.unwrap();
let ack = framed.next().await.transpose().unwrap().unwrap();
assert_eq!(ack.header.kind, MsgKind::HelloAck);
let hello_ack: HelloAckPayload = deserialize_payload(&ack.payload).unwrap();
assert!(hello_ack.accepted);
let init = SessionInitPayload {
total_len: 15,
chunk_bytes: 15,
ttl_ms: 0,
idle_ms: 0,
content_type: "text/plain".into(),
meta: vec![],
token: Some("tok".into()),
};
let init_bytes = serialize_payload(&init).unwrap();
send_frame(&mut framed, MsgKind::SessionInit, 1, 0, init_bytes).await.unwrap();
let init_ack = framed.next().await.transpose().unwrap().unwrap();
assert_eq!(init_ack.header.kind, MsgKind::Ack);
let data = Bytes::from_static(b"clipboard-text");
send_frame(&mut framed, MsgKind::Data, 1, 0, data).await.unwrap();
let resp = framed.next().await.transpose().unwrap().unwrap();
assert_eq!(resp.header.kind, MsgKind::Ack);
let done = framed.next().await.transpose().unwrap().unwrap();
assert_eq!(done.header.kind, MsgKind::SessionDone);
server.await.unwrap().unwrap();
assert_eq!(seen.lock().unwrap().as_deref(), Some("clipboard-text"));
}
#[tokio::test]
async fn listener_aborts_on_token_mismatch() {
let tmp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = tmp_listener.local_addr().unwrap().port();
drop(tmp_listener);
let bind = format!("127.0.0.1:{port}");
let listener_cfg = ResolvedBridgeListen {
bind: bind.clone(),
mode: crate::config::BridgeMode::Tcp,
idle_exit: Some("2h".into()),
ttl: None,
max_bytes: 1024 * 1024,
token: Some("secret".into()),
token_file: None,
daemon: false,
log_file: None,
};
let server = task::spawn(async move { run_tcp_listener_stub(listener_cfg).await });
sleep(StdDuration::from_millis(50)).await;
let stream = TcpStream::connect(bind).await.unwrap();
let mut framed = Framed::new(stream, BridgeCodec::new(1024 * 1024));
let hello = HelloPayload {
max_bytes: 1024 * 1024,
supports_udp: false,
token_present: false,
};
let payload = serialize_payload(&hello).unwrap();
send_frame(&mut framed, MsgKind::Hello, 0, 0, payload).await.unwrap();
let ack = framed.next().await.transpose().unwrap().unwrap();
assert_eq!(ack.header.kind, MsgKind::HelloAck);
let init = SessionInitPayload {
total_len: 4,
chunk_bytes: 4,
ttl_ms: 0,
idle_ms: 0,
content_type: "text/plain".into(),
meta: vec![],
token: None,
};
let init_bytes = serialize_payload(&init).unwrap();
send_frame(&mut framed, MsgKind::SessionInit, 1, 0, init_bytes).await.unwrap();
let abort = framed.next().await.transpose().unwrap().unwrap();
assert_eq!(abort.header.kind, MsgKind::SessionAbort);
assert_eq!(abort.header.flags & crate::bridge::fnc_frame_codec::FLAG_TOKEN_INVALID, crate::bridge::fnc_frame_codec::FLAG_TOKEN_INVALID);
server.await.unwrap().unwrap();
}
#[tokio::test]
async fn daemon_appends_logs() {
let tmp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = tmp_listener.local_addr().unwrap().port();
drop(tmp_listener);
let log = NamedTempFile::new().unwrap();
let log_path = log.path().to_path_buf();
let bind = format!("127.0.0.1:{port}");
let listener_cfg = ResolvedBridgeListen {
bind: bind.clone(),
mode: crate::config::BridgeMode::Tcp,
idle_exit: Some("5s".into()),
ttl: None,
max_bytes: 64 * 1024,
token: None,
token_file: None,
daemon: true,
log_file: Some(log_path.clone()),
};
let _handle = spawn_tcp_listener_daemon(listener_cfg);
sleep(StdDuration::from_millis(50)).await;
let client_cfg = ResolvedBridgeConnect {
to: bind,
socket: std::path::PathBuf::from("/run/wsl-clip/socket"),
mode: crate::config::BridgeMode::Tcp,
token: None,
token_file: None,
max_bytes: 64 * 1024,
};
let _ = run_tcp_client_stub(client_cfg).await;
sleep(StdDuration::from_millis(100)).await;
let contents = fs::read_to_string(&log_path).unwrap();
assert!(contents.contains("daemon_start"));
assert!(contents.contains("daemon_exit"));
}
#[tokio::test]
async fn listener_sets_size_flag_on_too_large_sessioninit() {
let tmp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = tmp_listener.local_addr().unwrap().port();
drop(tmp_listener);
let bind = format!("127.0.0.1:{port}");
let listener_cfg = ResolvedBridgeListen {
bind: bind.clone(),
mode: crate::config::BridgeMode::Tcp,
idle_exit: Some("2h".into()),
ttl: None,
max_bytes: 64,
token: None,
token_file: None,
daemon: false,
log_file: None,
};
let server = task::spawn(async move { run_tcp_listener_stub(listener_cfg).await });
sleep(StdDuration::from_millis(80)).await;
let mut stream = None;
for _ in 0..3 {
match TcpStream::connect(bind.clone()).await {
Ok(s) => {
stream = Some(s);
break;
}
Err(_) => sleep(StdDuration::from_millis(20)).await,
}
}
let stream = stream.expect("failed to connect to listener");
let mut framed = Framed::new(stream, BridgeCodec::new(1024));
let hello = HelloPayload { max_bytes: 64, supports_udp: false, token_present: false };
let payload = serialize_payload(&hello).unwrap();
send_frame(&mut framed, MsgKind::Hello, 0, 0, payload).await.unwrap();
let _ack = framed.next().await.transpose().unwrap().unwrap();
let init = SessionInitPayload {
total_len: 128,
chunk_bytes: 128,
ttl_ms: 0,
idle_ms: 0,
content_type: "text/plain".into(),
meta: vec![],
token: None,
};
let init_bytes = serialize_payload(&init).unwrap();
send_frame(&mut framed, MsgKind::SessionInit, 1, 0, init_bytes).await.unwrap();
let abort = framed.next().await.transpose().expect("frame read failed");
let abort = abort.expect("no frame after oversized init");
assert_eq!(abort.header.kind, MsgKind::SessionAbort);
assert_eq!(abort.header.flags & crate::bridge::fnc_frame_codec::FLAG_SIZE_EXCEEDED, crate::bridge::fnc_frame_codec::FLAG_SIZE_EXCEEDED);
server.await.unwrap().unwrap();
}
#[tokio::test]
async fn listener_sets_ttl_flag_on_expired_session() {
let tmp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = tmp_listener.local_addr().unwrap().port();
drop(tmp_listener);
let bind = format!("127.0.0.1:{port}");
let listener_cfg = ResolvedBridgeListen {
bind: bind.clone(),
mode: crate::config::BridgeMode::Tcp,
idle_exit: Some("2h".into()),
ttl: Some("0".into()),
max_bytes: 1024,
token: None,
token_file: None,
daemon: false,
log_file: None,
};
let server = task::spawn(async move { run_tcp_listener_stub(listener_cfg).await });
sleep(StdDuration::from_millis(30)).await;
let stream = TcpStream::connect(bind).await.unwrap();
let mut framed = Framed::new(stream, BridgeCodec::new(1024));
let hello = HelloPayload { max_bytes: 1024, supports_udp: false, token_present: false };
let payload = serialize_payload(&hello).unwrap();
send_frame(&mut framed, MsgKind::Hello, 0, 0, payload).await.unwrap();
let _ack = framed.next().await.transpose().unwrap().unwrap();
let init = SessionInitPayload {
total_len: 4,
chunk_bytes: 4,
ttl_ms: 0,
idle_ms: 0,
content_type: "text/plain".into(),
meta: vec![],
token: None,
};
let init_bytes = serialize_payload(&init).unwrap();
send_frame(&mut framed, MsgKind::SessionInit, 1, 0, init_bytes).await.unwrap();
let init_ack = framed.next().await.transpose().unwrap().unwrap();
assert_eq!(init_ack.header.kind, MsgKind::Ack);
let data = Bytes::from_static(b"pong");
send_frame(&mut framed, MsgKind::Data, 1, 0, data).await.unwrap();
let abort = framed.next().await.transpose().unwrap().unwrap();
assert_eq!(abort.header.kind, MsgKind::SessionAbort);
assert_eq!(abort.header.flags & crate::bridge::fnc_frame_codec::FLAG_TTL_EXPIRED, crate::bridge::fnc_frame_codec::FLAG_TTL_EXPIRED);
server.await.unwrap().unwrap();
}
#[tokio::test]
async fn tcp_end_to_end_mock_clipboard() {
let tmp_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = tmp_listener.local_addr().unwrap().port();
drop(tmp_listener);
let bind = format!("127.0.0.1:{port}");
let listener_cfg = ResolvedBridgeListen {
bind: bind.clone(),
mode: crate::config::BridgeMode::Tcp,
idle_exit: Some("5m".into()),
ttl: None,
max_bytes: 128 * 1024,
token: None,
token_file: None,
daemon: false,
log_file: None,
};
let seen = Arc::new(Mutex::new(None::<String>));
let seen_clone = seen.clone();
let server = task::spawn(async move {
run_tcp_listener_stub_with_clipboard(listener_cfg, move |text| {
*seen_clone.lock().unwrap() = Some(text.to_string());
Ok(())
})
.await
});
sleep(StdDuration::from_millis(50)).await;
let client_cfg = ResolvedBridgeConnect {
to: bind,
socket: std::path::PathBuf::from("/run/wsl-clip/socket"),
mode: crate::config::BridgeMode::Tcp,
token: None,
token_file: None,
max_bytes: 128 * 1024,
};
run_tcp_client_stub(client_cfg).await.unwrap();
server.await.unwrap().unwrap();
let val = seen.lock().unwrap().clone().expect("clipboard should be set");
assert_eq!(val.as_str(), "ping"); }
#[test]
fn tcp_client_plan_builds_frames() {
let cfg = ResolvedBridgeConnect {
to: "127.0.0.1:8121".into(),
socket: std::path::PathBuf::from("/run/wsl-clip/socket"),
mode: crate::config::BridgeMode::Tcp,
token: None,
token_file: None,
max_bytes: 1024,
};
let plan = TcpClientPlan::new(&cfg).unwrap();
assert_eq!(plan.frames.len(), 3);
assert_eq!(plan.frames[0].header.kind, MsgKind::Hello);
assert_eq!(plan.frames[1].header.kind, MsgKind::SessionInit);
assert_eq!(plan.frames[2].header.kind, MsgKind::Data);
}
}