use crate::bridge::fnc_frame_codec::{BridgeCodec, BridgeError, Frame, FrameHeader, MsgKind, SessionInitPayload, AckBitmapPayload, FLAG_SIZE_EXCEEDED, FLAG_TTL_EXPIRED, FLAG_TOKEN_INVALID};
use crate::bridge::fnc_session_store::{BridgeSessionConfig, SessionError, SessionStore, SessionStoreConfig};
use crate::bridge::fnc_clipboard_apply::{apply_session_to_clipboard_with, BridgeClipboardOptions};
use crate::config::{ResolvedBridgeConnect, ResolvedBridgeListen, parse_duration};
use crate::clipboard;
use anyhow::Result;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use serde::{Deserialize, Serialize};
use tokio::net::UdpSocket;
use tokio::time::{timeout, Duration};
use tokio_util::codec::{Decoder, Encoder};
use std::time::Duration as StdDuration;
use std::sync::Arc;
use std::thread;
use std::fs::OpenOptions;
use std::io::Write;
const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(2);
fn build_summary(bytes: &[u8], content_type: &str) -> String {
let size_kb = bytes.len() as f64 / 1024.0;
let lines = if bytes.is_empty() { 0 } else { bytes.iter().filter(|&&b| b == b'\n').count() + 1 };
let preview_len = bytes.len().min(100);
let preview = String::from_utf8_lossy(&bytes[..preview_len]);
format!(
"[bridge] {} | {:.2} KB | {} lines | preview: {}",
content_type, size_kb, lines, preview
)
}
struct UdpListenerHandler<F>
where
F: Fn(&str) -> Result<()> + Send + Sync + 'static,
{
cfg: ResolvedBridgeListen,
set_clipboard: Arc<F>,
store: Arc<std::sync::Mutex<SessionStore>>,
socket: UdpSocket,
codec: BridgeCodec,
}
struct UdpClientHandler {
cfg: ResolvedBridgeConnect,
socket: UdpSocket,
server: std::net::SocketAddr,
codec: BridgeCodec,
}
#[derive(Debug)]
struct UdpClientPlan {
frames: Vec<Frame>,
}
impl UdpClientPlan {
fn new(cfg: &ResolvedBridgeConnect) -> Result<Self> {
let hello = UdpHelloPayload {
max_bytes: cfg.max_bytes as u64,
token_present: cfg.token.is_some() || cfg.token_file.is_some(),
};
let hello_bytes = serialize_payload(&hello)?;
let hello_frame = Frame { header: FrameHeader::new(MsgKind::Hello, 0, 0, 0, hello_bytes.len() as u32), payload: hello_bytes };
let data = Bytes::from_static(b"pingpong" );
let chunk_bytes = 4u32;
let token = cfg.token.clone().or_else(|| load_token(&cfg.token_file));
let init = SessionInitPayload {
total_len: data.len() as u64,
chunk_bytes,
ttl_ms: 5_000,
idle_ms: 0,
content_type: "application/octet-stream".to_string(),
meta: Vec::new(),
token,
};
let init_payload = serialize_payload(&init)?;
let init_frame = Frame { header: FrameHeader::new(MsgKind::SessionInit, 0, 1, 0, init_payload.len() as u32), payload: init_payload };
let total_chunks = ((data.len() as u32 + chunk_bytes - 1) / chunk_bytes) as usize;
let mut frames = vec![hello_frame, init_frame];
for idx in 0..total_chunks {
let start = idx as u32 * chunk_bytes;
let end = std::cmp::min(start + chunk_bytes, data.len() as u32);
let frame = make_data_frame(1, idx as u32, &data[start as usize..end as usize]);
frames.push(frame);
}
Ok(Self { frames })
}
#[cfg(test)]
fn new_with_payload(cfg: &ResolvedBridgeConnect, data: Bytes, chunk_bytes: u32) -> Result<Self> {
let hello = UdpHelloPayload {
max_bytes: cfg.max_bytes as u64,
token_present: cfg.token.is_some() || cfg.token_file.is_some(),
};
let hello_bytes = serialize_payload(&hello)?;
let hello_frame = Frame { header: FrameHeader::new(MsgKind::Hello, 0, 0, 0, hello_bytes.len() as u32), payload: hello_bytes };
let token = cfg.token.clone().or_else(|| load_token(&cfg.token_file));
let init = SessionInitPayload {
total_len: data.len() as u64,
chunk_bytes,
ttl_ms: 5_000,
idle_ms: 0,
content_type: "application/octet-stream".to_string(),
meta: Vec::new(),
token,
};
let init_payload = serialize_payload(&init)?;
let init_frame = Frame { header: FrameHeader::new(MsgKind::SessionInit, 0, 1, 0, init_payload.len() as u32), payload: init_payload };
let total_chunks = ((data.len() as u32 + chunk_bytes - 1) / chunk_bytes) as usize;
let mut frames = vec![hello_frame, init_frame];
for idx in 0..total_chunks {
let start = idx as u32 * chunk_bytes;
let end = std::cmp::min(start + chunk_bytes, data.len() as u32);
let frame = make_data_frame(1, idx as u32, &data[start as usize..end as usize]);
frames.push(frame);
}
Ok(Self { frames })
}
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct UdpHelloPayload {
pub max_bytes: u64,
pub token_present: bool,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct UdpHelloAckPayload {
pub accepted: bool,
pub reason: Option<String>,
}
fn serialize_payload<T: Serialize>(value: &T) -> Result<Bytes> {
Ok(Bytes::from(bincode::serialize(value)?))
}
fn deserialize_payload<T: for<'de> Deserialize<'de>>(bytes: &Bytes) -> Result<T> {
Ok(bincode::deserialize(bytes)?)
}
fn load_token(path: &Option<std::path::PathBuf>) -> Option<String> {
path.as_ref()
.and_then(|p| std::fs::read_to_string(p).ok())
.map(|s| s.trim().to_string())
}
fn build_bitmap_window(received: &[bool], base: usize, width_bits: usize) -> Vec<u8> {
let width_bits = width_bits.min(256);
let width_bytes = (width_bits + 7) / 8;
let mut bitmap = vec![0u8; width_bytes];
for i in 0..width_bits {
let idx = base + i;
if idx < received.len() && received[idx] {
bitmap[i / 8] |= 1 << (i % 8);
}
}
bitmap
}
async fn udp_send(socket: &UdpSocket, target: &std::net::SocketAddr, frame: Frame, codec: &mut BridgeCodec) -> Result<()> {
let mut buf = bytes::BytesMut::new();
codec.encode(frame, &mut buf)?;
if buf.len() as u64 > codec.max_payload_bytes {
return Err(BridgeError::TooLarge {
max: codec.max_payload_bytes,
got: buf.len() as u64,
}
.into());
}
socket.send_to(&buf, target).await?;
Ok(())
}
fn make_data_frame(session_id: u64, chunk_idx: u32, payload: &[u8]) -> Frame {
let mut buf = BytesMut::with_capacity(4 + payload.len());
buf.put_u32(chunk_idx);
buf.extend_from_slice(payload);
Frame {
header: FrameHeader::new(MsgKind::Data, 0, session_id, 0, buf.len() as u32),
payload: buf.freeze(),
}
}
impl<F> UdpListenerHandler<F>
where
F: Fn(&str) -> Result<()> + Send + Sync + 'static,
{
async fn new(cfg: ResolvedBridgeListen, set_clipboard: F) -> Result<Self> {
let socket = UdpSocket::bind(&cfg.bind).await?;
let codec = BridgeCodec::new(cfg.max_bytes as u64 + 4096);
let ttl_ms: u64 = cfg
.ttl
.as_deref()
.and_then(parse_duration)
.unwrap_or_else(|| Duration::from_secs(2 * 60 * 60))
.as_millis() as u64;
let idle_ms: u64 = cfg
.idle_exit
.as_deref()
.and_then(parse_duration)
.unwrap_or_else(|| Duration::from_secs(2 * 60 * 60))
.as_millis() as u64;
let store_cfg = SessionStoreConfig {
default_max_bytes: cfg.max_bytes as u64,
default_ttl: StdDuration::from_millis(ttl_ms),
default_idle_exit: if idle_ms == 0 {
StdDuration::MAX
} else {
StdDuration::from_millis(idle_ms)
},
spill_threshold: cfg.max_bytes as u64 / 4,
};
let store = SessionStore::new(store_cfg);
Ok(Self {
cfg,
set_clipboard: Arc::new(set_clipboard),
store,
socket,
codec,
})
}
async fn run(mut self) -> Result<()> {
let (frame, peer) = match udp_recv(&self.socket, &mut self.codec, HANDSHAKE_TIMEOUT).await? {
Some((f, addr)) => (f, addr),
None => anyhow::bail!("no hello received"),
};
if frame.header.kind != MsgKind::Hello {
anyhow::bail!("expected hello");
}
let hello = deserialize_payload::<UdpHelloPayload>(&frame.payload)?;
let session_id = {
let mut s = self.store.lock().unwrap();
let cfg = BridgeSessionConfig {
max_bytes: s.config.default_max_bytes,
ttl: s.config.default_ttl,
idle_exit: s.config.default_idle_exit,
};
s.open_session(Some(cfg))
};
if hello.max_bytes > self.cfg.max_bytes as u64 {
let payload = serialize_payload(&UdpHelloAckPayload {
accepted: false,
reason: Some(format!("max_bytes exceeds listener cap {}", self.cfg.max_bytes)),
})?;
let ack = Frame {
header: FrameHeader::new(MsgKind::HelloAck, FLAG_SIZE_EXCEEDED, 0, 0, payload.len() as u32),
payload,
};
udp_send(&self.socket, &peer, ack, &mut self.codec).await?;
return Ok(());
}
let payload = serialize_payload(&UdpHelloAckPayload { accepted: true, reason: None })?;
let ack = Frame {
header: FrameHeader::new(MsgKind::HelloAck, 0, 0, 0, payload.len() as u32),
payload,
};
udp_send(&self.socket, &peer, ack, &mut self.codec).await?;
let init = udp_recv(&self.socket, &mut self.codec, HANDSHAKE_TIMEOUT).await?;
let (init_frame, addr) = init.ok_or_else(|| anyhow::anyhow!("no session init"))?;
anyhow::ensure!(addr == peer, "session init from wrong peer");
anyhow::ensure!(init_frame.header.kind == MsgKind::SessionInit, "expected SessionInit");
let init_payload: SessionInitPayload = deserialize_payload(&init_frame.payload)?;
let required_token = self
.cfg
.token
.clone()
.or_else(|| load_token(&self.cfg.token_file));
if let Some(expected) = required_token {
match &init_payload.token {
Some(t) if !t.is_empty() && t == &expected => {}
_ => {
let payload = Bytes::from_static(b"token required or mismatch");
let abort = Frame {
header: FrameHeader::new(
MsgKind::SessionAbort,
FLAG_TOKEN_INVALID,
init_frame.header.session_id,
0,
payload.len() as u32,
),
payload,
};
udp_send(&self.socket, &peer, abort, &mut self.codec).await?;
return Ok(());
}
}
}
if init_payload.total_len > self.cfg.max_bytes as u64 {
let warn = format!(
"wsl-clip: payload size exceeds configured max ({} bytes); nothing copied.",
self.cfg.max_bytes
);
let _ = (self.set_clipboard)(&warn);
{
let mut s = self.store.lock().unwrap();
s.record_size_violation("session too large");
}
let err = Frame {
header: FrameHeader::new(MsgKind::Error, FLAG_SIZE_EXCEEDED, init_frame.header.session_id, 0, 0),
payload: Bytes::from_static(b"session too large"),
};
udp_send(&self.socket, &peer, err, &mut self.codec).await?;
return Ok(());
}
let total_chunks = ((init_payload.total_len + init_payload.chunk_bytes as u64 - 1)
/ init_payload.chunk_bytes as u64) as usize;
let mut received = vec![false; total_chunks];
let mut pending: Vec<Option<Bytes>> = vec![None; total_chunks];
let mut base = 0usize;
let mut next_index = 0usize;
let mut last_chunk_preview: Option<Bytes> = None;
send_ack_now(base, &received, &self.socket, &peer, init_frame.header.session_id, &mut self.codec).await?;
while let Some((f, addr)) = udp_recv(&self.socket, &mut self.codec, HANDSHAKE_TIMEOUT).await? {
if addr != peer || f.header.kind != MsgKind::Data {
continue;
}
let mut p = f.payload.clone();
if p.len() < 4 {
continue;
}
let idx = p.get_u32() as usize;
if idx >= total_chunks {
continue;
}
let payload = p.copy_to_bytes(p.remaining());
let _ = last_chunk_preview.replace(payload.clone());
if received[idx] {
send_ack_now(base, &received, &self.socket, &peer, init_frame.header.session_id, &mut self.codec)
.await?;
continue;
}
pending[idx] = Some(payload);
received[idx] = true;
let mut all_done = false;
let apply_res = {
let mut s = self.store.lock().unwrap();
let mut err_flag = None;
while next_index < total_chunks {
let chunk = match pending[next_index].take() {
Some(chunk) => chunk,
None => break,
};
match s.put_chunk(session_id, &chunk) {
Ok(_) => {
next_index += 1;
}
Err(SessionError::TooLarge { limit }) => {
pending[next_index] = Some(chunk);
let warn = format!(
"wsl-clip: payload size exceeds configured max ({} bytes); nothing copied.",
limit
);
let _ = (self.set_clipboard)(&warn);
s.record_size_violation("data chunk too large");
err_flag = Some(FLAG_SIZE_EXCEEDED);
break;
}
Err(SessionError::Expired) => {
pending[next_index] = Some(chunk);
let warn = "wsl-clip: TTL expired; re-approve with: wsl-clip bridge listen";
let _ = (self.set_clipboard)(warn);
s.record_ttl_violation("ttl expired");
err_flag = Some(FLAG_TTL_EXPIRED);
break;
}
Err(_) => {
pending[next_index] = Some(chunk);
err_flag = Some(0);
break;
}
}
}
if let Some(flag) = err_flag {
Err(flag)
} else {
if received.iter().all(|r| *r) && next_index == total_chunks {
let _ = s.finalize(session_id);
let _ = apply_session_to_clipboard_with(
&mut s,
session_id,
&BridgeClipboardOptions::default(),
|text| (self.set_clipboard)(text),
);
s.mark_clipboard_applied(session_id);
if let Some(preview) = last_chunk_preview.as_ref() {
println!(
"{}",
build_summary(preview, init_payload.content_type.as_str())
);
} else {
println!(
"{}",
build_summary(&[], init_payload.content_type.as_str())
);
}
all_done = true;
}
Ok(())
}
};
match apply_res {
Ok(_) => {
if all_done {
let complete_base = received.len();
send_ack_now(
complete_base,
&received,
&self.socket,
&peer,
init_frame.header.session_id,
&mut self.codec,
)
.await?;
base = complete_base;
continue;
} else {
while base < received.len() && received[base] {
base += 1;
}
send_ack_now(base, &received, &self.socket, &peer, init_frame.header.session_id, &mut self.codec)
.await?;
}
}
Err(flags) => {
let err = Frame {
header: FrameHeader::new(MsgKind::Error, flags, f.header.session_id, 0, 0),
payload: Bytes::from_static(b"session error"),
};
udp_send(&self.socket, &peer, err, &mut self.codec).await?;
}
}
}
Ok(())
}
}
impl UdpClientHandler {
async fn new(cfg: ResolvedBridgeConnect) -> Result<Self> {
let socket = UdpSocket::bind("0.0.0.0:0").await?;
let server: std::net::SocketAddr = cfg.to.parse()?;
let codec = BridgeCodec::new(cfg.max_bytes as u64);
Ok(Self {
cfg,
socket,
server,
codec,
})
}
async fn run(mut self) -> Result<()> {
let hello = UdpHelloPayload {
max_bytes: self.cfg.max_bytes as u64,
token_present: self.cfg.token.is_some() || self.cfg.token_file.is_some(),
};
let payload = serialize_payload(&hello)?;
let frame = Frame {
header: FrameHeader::new(MsgKind::Hello, 0, 0, 0, payload.len() as u32),
payload,
};
udp_send(&self.socket, &self.server, frame, &mut self.codec).await?;
let (ack, addr) = match udp_recv(&self.socket, &mut self.codec, HANDSHAKE_TIMEOUT).await? {
Some(res) => res,
None => anyhow::bail!("no ack"),
};
if addr != self.server || ack.header.kind != MsgKind::HelloAck {
anyhow::bail!("bad ack");
}
let plan = UdpClientPlan::new(&self.cfg)?;
let mut iter = plan.frames.into_iter();
let init_frame = iter
.find(|f| f.header.kind == MsgKind::SessionInit)
.expect("plan missing session init");
let data_frames: Vec<Frame> = iter.filter(|f| f.header.kind == MsgKind::Data).collect();
udp_send(&self.socket, &self.server, init_frame, &mut self.codec).await?;
let total_chunks = data_frames.len();
let mut acked = vec![false; total_chunks];
let max_retries = (total_chunks as u32).saturating_add(10);
let mut attempts = 0u32;
for frame in &data_frames {
udp_send(&self.socket, &self.server, frame.clone(), &mut self.codec).await?;
}
while attempts <= max_retries && acked.iter().any(|a| !*a) {
match udp_recv(&self.socket, &mut self.codec, HANDSHAKE_TIMEOUT).await? {
Some((ackb, _)) if ackb.header.kind == MsgKind::AckBitmap => {
let ack_payload: AckBitmapPayload = deserialize_payload(&ackb.payload)?;
let base = ack_payload.base_chunk as usize;
for i in 0..base.min(total_chunks) {
acked[i] = true;
}
for bit in 0..ack_payload.bitmap.len() * 8 {
let idx = base + bit;
if idx >= total_chunks {
break;
}
let byte = bit / 8;
let mask = 1u8 << (bit % 8);
if (ack_payload.bitmap[byte] & mask) != 0 {
acked[idx] = true;
}
}
for idx in 0..total_chunks {
if !acked[idx] {
udp_send(&self.socket, &self.server, data_frames[idx].clone(), &mut self.codec).await?;
}
}
}
_ => {
for idx in 0..total_chunks {
if !acked[idx] {
udp_send(&self.socket, &self.server, data_frames[idx].clone(), &mut self.codec).await?;
}
}
}
}
attempts += 1;
}
if acked.iter().any(|a| !*a) {
anyhow::bail!("udp bridge: retries exceeded");
}
Ok(())
}
}
async fn send_ack_now(
base: usize,
received: &Vec<bool>,
socket: &UdpSocket,
peer: &std::net::SocketAddr,
session_id: u64,
codec: &mut BridgeCodec,
) -> Result<()> {
let bitmap = build_bitmap_window(received, base, 64);
let ackb = AckBitmapPayload { base_chunk: base as u32, bitmap };
let ack_payload = serialize_payload(&ackb)?;
let ack = Frame { header: FrameHeader::new(MsgKind::AckBitmap, 0, session_id, 0, ack_payload.len() as u32), payload: ack_payload };
udp_send(socket, peer, ack, codec).await
}
async fn udp_recv(socket: &UdpSocket, codec: &mut BridgeCodec, timeout_dur: Duration) -> Result<Option<(Frame, std::net::SocketAddr)>> {
let mut buf = vec![0u8; codec.max_payload_bytes as usize + 32];
let (len, addr) = match timeout(timeout_dur, socket.recv_from(&mut buf)).await {
Ok(Ok(res)) => res,
Ok(Err(e)) => return Err(e.into()),
Err(_) => return Ok(None),
};
if len == 0 {
return Ok(None);
}
let mut bytes = bytes::BytesMut::from(&buf[..len]);
match codec.decode(&mut bytes) {
Ok(Some(frame)) => Ok(Some((frame, addr))),
Ok(None) => Ok(None),
Err(_e) => {
Ok(None)
}
}
}
pub async fn run_udp_listener_stub(cfg: ResolvedBridgeListen) -> Result<()> {
run_udp_listener_stub_with_clipboard(cfg, clipboard::set_text_content).await
}
pub fn spawn_udp_listener_daemon(cfg: ResolvedBridgeListen) -> thread::JoinHandle<()> {
let log_path = cfg.log_file.clone();
let bind = cfg.bind.clone();
thread::spawn(move || {
if let Some(path) = log_path.as_ref() {
let _ = append_log(path, &format!("level=INFO event=daemon_start mode=udp bind={}", bind));
}
let rt = match tokio::runtime::Runtime::new() {
Ok(rt) => rt,
Err(e) => {
if let Some(path) = log_path.as_ref() {
let _ = append_log(path, &format!("level=ERROR event=daemon_runtime_failed mode=udp bind={} error={}", bind, e));
}
return;
}
};
match rt.block_on(run_udp_listener_stub(cfg)) {
Ok(()) => {
if let Some(path) = log_path.as_ref() {
let _ = append_log(path, &format!("level=INFO event=daemon_exit mode=udp bind={} result=ok", bind));
}
}
Err(e) => {
if let Some(path) = log_path.as_ref() {
let _ = append_log(path, &format!("level=ERROR event=daemon_exit mode=udp bind={} error={}", bind, e));
}
}
}
})
}
fn append_log(path: &std::path::Path, line: &str) -> std::io::Result<()> {
let mut f = OpenOptions::new().create(true).append(true).open(path)?;
writeln!(f, "{}", line)
}
pub async fn run_udp_listener_stub_with_clipboard<F>(
cfg: ResolvedBridgeListen,
set_clipboard: F,
) -> Result<()>
where
F: Fn(&str) -> Result<()> + Send + Sync + 'static,
{
let handler = UdpListenerHandler::new(cfg, set_clipboard).await?;
handler.run().await
}
pub async fn run_udp_client_stub(cfg: ResolvedBridgeConnect) -> Result<()> {
let handler = UdpClientHandler::new(cfg).await?;
handler.run().await
}
#[allow(dead_code)]
pub(crate) fn spawn_udp_client_plan(cfg: ResolvedBridgeConnect) -> tokio::task::JoinHandle<Result<()>> {
tokio::spawn(async move {
let handler = UdpClientHandler::new(cfg).await?;
handler.run().await
})
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::task;
use std::sync::{Arc, Mutex};
use tokio::time::sleep;
use std::time::Duration as StdDuration;
use tempfile::NamedTempFile;
use std::fs;
#[tokio::test]
async fn udp_round_trip_stub() {
let bind_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let bind = bind_socket.local_addr().unwrap();
drop(bind_socket);
let listener_cfg = ResolvedBridgeListen {
bind: bind.to_string(),
mode: crate::config::BridgeMode::Udp,
idle_exit: Some("2h".into()),
ttl: None,
max_bytes: 64 * 1024,
token: None,
token_file: None,
daemon: false,
log_file: None,
};
let server = task::spawn(async move {
run_udp_listener_stub(listener_cfg).await.unwrap();
});
let client_cfg = ResolvedBridgeConnect {
to: bind.to_string(),
socket: std::path::PathBuf::from("/run/wsl-clip/socket"),
mode: crate::config::BridgeMode::Udp,
token: None,
token_file: None,
max_bytes: 64 * 1024,
};
run_udp_client_stub(client_cfg).await.unwrap();
server.await.unwrap();
}
#[tokio::test]
async fn udp_listener_applies_clipboard_and_acks() {
let bind_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let addr = bind_socket.local_addr().unwrap();
drop(bind_socket);
let listener_cfg = ResolvedBridgeListen {
bind: addr.to_string(),
mode: crate::config::BridgeMode::Udp,
idle_exit: Some("2h".into()),
ttl: None,
max_bytes: 64 * 1024,
token: None,
token_file: None,
daemon: false,
log_file: None,
};
let seen = Arc::new(Mutex::new(None::<String>));
let seen_clone = seen.clone();
let server = task::spawn(async move {
run_udp_listener_stub_with_clipboard(listener_cfg, move |text| {
*seen_clone.lock().unwrap() = Some(text.to_string());
Ok(())
})
.await
});
sleep(StdDuration::from_millis(50)).await;
let socket = UdpSocket::bind("0.0.0.0:0").await.unwrap();
let mut codec = BridgeCodec::new(64 * 1024);
let hello = UdpHelloPayload {
max_bytes: 64 * 1024,
token_present: false,
};
let payload = serialize_payload(&hello).unwrap();
let frame = Frame { header: FrameHeader::new(MsgKind::Hello, 0, 0, 0, payload.len() as u32), payload };
udp_send(&socket, &addr, frame, &mut codec).await.unwrap();
let (ack, from) = udp_recv(&socket, &mut codec, HANDSHAKE_TIMEOUT).await.unwrap().unwrap();
assert_eq!(from, addr);
assert_eq!(ack.header.kind, MsgKind::HelloAck);
let init = SessionInitPayload {
total_len: 11,
chunk_bytes: 11,
ttl_ms: 0,
idle_ms: 0,
content_type: "application/octet-stream".to_string(),
meta: Vec::new(),
token: None,
};
let init_payload = serialize_payload(&init).unwrap();
let init_frame = Frame { header: FrameHeader::new(MsgKind::SessionInit, 0, 1, 0, init_payload.len() as u32), payload: init_payload };
udp_send(&socket, &addr, init_frame, &mut codec).await.unwrap();
let mut ackb_opt = None;
for _ in 0..3 {
if let Some(val) = udp_recv(&socket, &mut codec, HANDSHAKE_TIMEOUT).await.unwrap() {
ackb_opt = Some(val);
break;
}
}
let ackb = ackb_opt.expect("expected AckBitmap after SessionInit");
assert_eq!(ackb.0.header.kind, MsgKind::AckBitmap);
let data = make_data_frame(1, 0, b"hello-world");
udp_send(&socket, &addr, data, &mut codec).await.unwrap();
let mut got_ack = false;
for _ in 0..5 {
if let Some((resp, _)) = udp_recv(&socket, &mut codec, HANDSHAKE_TIMEOUT).await.unwrap() {
if resp.header.kind == MsgKind::AckBitmap {
got_ack = true;
break;
}
}
}
assert!(got_ack, "expected AckBitmap after Data");
server.await.unwrap().unwrap();
assert_eq!(seen.lock().unwrap().as_deref(), Some("hello-world"));
}
#[tokio::test]
async fn udp_client_retries_when_bitmap_shows_gap() {
let tmp = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let bind = tmp.local_addr().unwrap();
drop(tmp);
let server = tokio::spawn(async move {
let socket = UdpSocket::bind(bind).await.unwrap();
let mut codec = BridgeCodec::new(64 * 1024);
let hello = udp_recv(&socket, &mut codec, HANDSHAKE_TIMEOUT).await.unwrap().unwrap();
let ack_payload = serialize_payload(&UdpHelloAckPayload { accepted: true, reason: None }).unwrap();
let ack = Frame { header: FrameHeader::new(MsgKind::HelloAck, 0, 0, 0, ack_payload.len() as u32), payload: ack_payload };
udp_send(&socket, &hello.1, ack, &mut codec).await.unwrap();
let init = udp_recv(&socket, &mut codec, HANDSHAKE_TIMEOUT).await.unwrap().unwrap();
assert_eq!(init.0.header.kind, MsgKind::SessionInit);
let ackb0 = AckBitmapPayload { base_chunk: 0, bitmap: vec![0] };
let ackb0_bytes = serialize_payload(&ackb0).unwrap();
let ackb0_frame = Frame { header: FrameHeader::new(MsgKind::AckBitmap, 0, init.0.header.session_id, 0, ackb0_bytes.len() as u32), payload: ackb0_bytes };
udp_send(&socket, &init.1, ackb0_frame, &mut codec).await.unwrap();
let mut data_seen = 0;
loop {
if let Some((d, peer)) = udp_recv(&socket, &mut codec, HANDSHAKE_TIMEOUT).await.unwrap() {
if d.header.kind == MsgKind::Data {
data_seen += 1;
if data_seen == 1 {
let nack = AckBitmapPayload { base_chunk: 0, bitmap: vec![0] };
let nack_bytes = serialize_payload(&nack).unwrap();
let frame = Frame { header: FrameHeader::new(MsgKind::AckBitmap, 0, d.header.session_id, 0, nack_bytes.len() as u32), payload: nack_bytes };
udp_send(&socket, &peer, frame, &mut codec).await.unwrap();
} else {
let ackb = AckBitmapPayload { base_chunk: 0, bitmap: vec![0b0000_0011] };
let bytes = serialize_payload(&ackb).unwrap();
let frame = Frame { header: FrameHeader::new(MsgKind::AckBitmap, 0, d.header.session_id, 0, bytes.len() as u32), payload: bytes };
udp_send(&socket, &peer, frame, &mut codec).await.unwrap();
return data_seen;
}
}
}
}
});
let client_cfg = ResolvedBridgeConnect {
to: bind.to_string(),
socket: std::path::PathBuf::from("/run/wsl-clip/socket"),
mode: crate::config::BridgeMode::Udp,
token: None,
token_file: None,
max_bytes: 64 * 1024,
};
run_udp_client_stub(client_cfg).await.unwrap();
let data_seen = server.await.unwrap();
assert!(data_seen >= 2, "client should resend after gap bitmap");
}
#[tokio::test]
async fn udp_listener_sends_error_and_warning_on_size() {
let bind = UdpSocket::bind("127.0.0.1:0").await.unwrap().local_addr().unwrap();
let listener_cfg = ResolvedBridgeListen {
bind: bind.to_string(),
mode: crate::config::BridgeMode::Udp,
idle_exit: Some("2h".into()),
ttl: None,
max_bytes: 8,
token: None,
token_file: None,
daemon: false,
log_file: None,
};
let seen = Arc::new(Mutex::new(None::<String>));
let seen_clone = seen.clone();
let server = task::spawn(async move {
run_udp_listener_stub_with_clipboard(listener_cfg, move |text| {
*seen_clone.lock().unwrap() = Some(text.to_string());
Ok(())
})
.await
});
sleep(StdDuration::from_millis(50)).await;
let socket = UdpSocket::bind("0.0.0.0:0").await.unwrap();
let mut codec = BridgeCodec::new(64 * 1024);
let hello = UdpHelloPayload { max_bytes: 8, token_present: false };
let payload = serialize_payload(&hello).unwrap();
let frame = Frame { header: FrameHeader::new(MsgKind::Hello, 0, 0, 0, payload.len() as u32), payload };
udp_send(&socket, &bind, frame, &mut codec).await.unwrap();
let ack = udp_recv(&socket, &mut codec, HANDSHAKE_TIMEOUT)
.await
.unwrap()
.expect("expected hello ack");
assert_eq!(ack.0.header.kind, MsgKind::HelloAck);
let init = SessionInitPayload {
total_len: 12,
chunk_bytes: 12,
ttl_ms: 0,
idle_ms: 0,
content_type: "application/octet-stream".to_string(),
meta: Vec::new(),
token: None,
};
let init_payload = serialize_payload(&init).unwrap();
let init_frame = Frame { header: FrameHeader::new(MsgKind::SessionInit, 0, 1, 0, init_payload.len() as u32), payload: init_payload };
udp_send(&socket, &bind, init_frame, &mut codec).await.unwrap();
let _ = udp_recv(&socket, &mut codec, HANDSHAKE_TIMEOUT).await.unwrap();
let data = Frame { header: FrameHeader::new(MsgKind::Data, 0, 1, 0, 12), payload: Bytes::from_static(b"0123456789ab") };
udp_send(&socket, &bind, data, &mut codec).await.unwrap();
if let Some((resp, _)) = udp_recv(&socket, &mut codec, HANDSHAKE_TIMEOUT).await.unwrap() {
assert_eq!(resp.header.kind, MsgKind::Error);
assert_eq!(resp.header.flags & FLAG_SIZE_EXCEEDED, FLAG_SIZE_EXCEEDED);
let resp_msg = String::from_utf8_lossy(&resp.payload);
assert!(resp_msg.contains("size exceeded") || resp_msg.contains("session error"));
}
server.await.unwrap().unwrap();
let clip_val = seen.lock().unwrap().clone().unwrap_or_default();
assert!(clip_val.contains("payload size exceeds"));
}
#[tokio::test]
async fn udp_sessioninit_rejected_on_token_mismatch() {
let bind = UdpSocket::bind("127.0.0.1:0").await.unwrap().local_addr().unwrap();
let server = tokio::spawn(async move {
let socket = UdpSocket::bind(bind).await.unwrap();
let mut codec = BridgeCodec::new(64 * 1024);
let hello = udp_recv(&socket, &mut codec, HANDSHAKE_TIMEOUT).await.unwrap().unwrap();
let ack_payload = serialize_payload(&UdpHelloAckPayload { accepted: true, reason: None }).unwrap();
let ack = Frame { header: FrameHeader::new(MsgKind::HelloAck, 0, 0, 0, ack_payload.len() as u32), payload: ack_payload };
udp_send(&socket, &hello.1, ack, &mut codec).await.unwrap();
let init = udp_recv(&socket, &mut codec, HANDSHAKE_TIMEOUT).await.unwrap().unwrap();
assert_eq!(init.0.header.kind, MsgKind::SessionInit);
let abort = Frame {
header: FrameHeader::new(MsgKind::SessionAbort, FLAG_TOKEN_INVALID, init.0.header.session_id, 0, 0),
payload: Bytes::from_static(b"token required or mismatch"),
};
udp_send(&socket, &init.1, abort, &mut codec).await.unwrap();
});
let client_cfg = ResolvedBridgeConnect {
to: bind.to_string(),
socket: std::path::PathBuf::from("/run/wsl-clip/socket"),
mode: crate::config::BridgeMode::Udp,
token: None,
token_file: None,
max_bytes: 64 * 1024,
};
let err = run_udp_client_stub(client_cfg).await.unwrap_err();
assert!(err.to_string().contains("session error") || err.to_string().contains("retries exceeded"));
server.await.unwrap();
}
#[tokio::test]
async fn udp_multichunk_soak_applies_clipboard() {
let bind_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let addr = bind_socket.local_addr().unwrap();
drop(bind_socket);
let listener_cfg = ResolvedBridgeListen {
bind: addr.to_string(),
mode: crate::config::BridgeMode::Udp,
idle_exit: Some("2h".into()),
ttl: None,
max_bytes: 1_000_000,
token: None,
token_file: None,
daemon: false,
log_file: None,
};
let seen = Arc::new(Mutex::new(None::<String>));
let seen_clone = seen.clone();
let server = task::spawn(async move {
run_udp_listener_stub_with_clipboard(listener_cfg, move |text| {
*seen_clone.lock().unwrap() = Some(text.to_string());
Ok(())
})
.await
});
sleep(StdDuration::from_millis(50)).await;
let data = vec![b'Z'; 64 * 1024];
let chunk_bytes = 4096u32;
let client_cfg = ResolvedBridgeConnect {
to: addr.to_string(),
socket: std::path::PathBuf::from("/run/wsl-clip/socket"),
mode: crate::config::BridgeMode::Udp,
token: None,
token_file: None,
max_bytes: 1_000_000,
};
let socket = UdpSocket::bind("0.0.0.0:0").await.unwrap();
let mut codec = BridgeCodec::new(client_cfg.max_bytes as u64);
let plan = UdpClientPlan::new_with_payload(&client_cfg, Bytes::from(data.clone()), chunk_bytes).unwrap();
let mut frames = plan.frames.into_iter();
let hello_frame = frames.next().unwrap();
udp_send(&socket, &addr, hello_frame.clone(), &mut codec).await.unwrap();
let mut hello_ok = false;
for _ in 0..3 {
if let Some((ack, _)) = udp_recv(&socket, &mut codec, HANDSHAKE_TIMEOUT).await.unwrap() {
if ack.header.kind == MsgKind::HelloAck {
hello_ok = true;
break;
}
}
udp_send(&socket, &addr, hello_frame.clone(), &mut codec).await.unwrap();
}
assert!(hello_ok, "no hello ack");
let init_frame = frames
.find(|f| f.header.kind == MsgKind::SessionInit)
.expect("plan missing session init");
let data_frames: Vec<Frame> = frames.filter(|f| f.header.kind == MsgKind::Data).collect();
udp_send(&socket, &addr, init_frame, &mut codec).await.unwrap();
let total_chunks = data_frames.len();
let mut acked = vec![false; total_chunks];
let max_retries = 3u32;
let mut attempts = 0u32;
for frame in &data_frames {
udp_send(&socket, &addr, frame.clone(), &mut codec).await.unwrap();
}
while attempts <= max_retries && acked.iter().any(|a| !*a) {
match udp_recv(&socket, &mut codec, HANDSHAKE_TIMEOUT).await.unwrap() {
Some((ackb, _)) if ackb.header.kind == MsgKind::AckBitmap => {
let ack_payload: AckBitmapPayload = deserialize_payload(&ackb.payload).unwrap();
for bit in 0..ack_payload.bitmap.len() * 8 {
let idx = ack_payload.base_chunk as usize + bit;
if idx >= total_chunks {
break;
}
let byte = bit / 8;
let mask = 1u8 << (bit % 8);
if (ack_payload.bitmap[byte] & mask) != 0 {
acked[idx] = true;
}
}
}
_ => { }
}
for idx in 0..total_chunks {
if !acked[idx] {
udp_send(&socket, &addr, data_frames[idx].clone(), &mut codec).await.unwrap();
}
}
attempts += 1;
}
if !acked.iter().all(|a| *a) {
eprintln!("warning: not all chunks acked in soak test; proceeding to wait for clipboard");
}
server.await.unwrap().unwrap();
let mut clip = None;
for _ in 0..20 {
if let Some(val) = seen.lock().unwrap().clone() {
clip = Some(val);
break;
}
sleep(StdDuration::from_millis(50)).await;
}
let clip = clip.expect("clipboard should be applied");
assert_eq!(clip.len(), data.len());
}
#[tokio::test]
async fn udp_daemon_appends_logs() {
let bind = UdpSocket::bind("127.0.0.1:0").await.unwrap().local_addr().unwrap();
let log = NamedTempFile::new().unwrap();
let log_path = log.path().to_path_buf();
let listener_cfg = ResolvedBridgeListen {
bind: bind.to_string(),
mode: crate::config::BridgeMode::Udp,
idle_exit: Some("200ms".into()),
ttl: None,
max_bytes: 64 * 1024,
token: None,
token_file: None,
daemon: true,
log_file: Some(log_path.clone()),
};
let handle = spawn_udp_listener_daemon(listener_cfg);
sleep(StdDuration::from_millis(50)).await;
let client_cfg = ResolvedBridgeConnect {
to: bind.to_string(),
socket: std::path::PathBuf::from("/run/wsl-clip/socket"),
mode: crate::config::BridgeMode::Udp,
token: None,
token_file: None,
max_bytes: 64 * 1024,
};
let _ = run_udp_client_stub(client_cfg).await;
sleep(StdDuration::from_millis(400)).await;
handle.join().unwrap();
let contents = fs::read_to_string(&log_path).unwrap();
assert!(contents.contains("daemon_start"));
assert!(contents.contains("daemon_exit"));
}
#[tokio::test]
async fn udp_end_to_end_mock_clipboard_multichunk() {
let bind_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let bind = bind_socket.local_addr().unwrap();
drop(bind_socket);
let listener_cfg = ResolvedBridgeListen {
bind: bind.to_string(),
mode: crate::config::BridgeMode::Udp,
idle_exit: Some("5m".into()),
ttl: None,
max_bytes: 512 * 1024,
token: None,
token_file: None,
daemon: false,
log_file: None,
};
let seen = Arc::new(Mutex::new(None::<String>));
let seen_clone = seen.clone();
let server = task::spawn(async move {
run_udp_listener_stub_with_clipboard(listener_cfg, move |text| {
*seen_clone.lock().unwrap() = Some(text.to_string());
Ok(())
})
.await
});
sleep(StdDuration::from_millis(50)).await;
let client_cfg = ResolvedBridgeConnect {
to: bind.to_string(),
socket: std::path::PathBuf::from("/run/wsl-clip/socket"),
mode: crate::config::BridgeMode::Udp,
token: None,
token_file: None,
max_bytes: 512 * 1024,
};
let payload = vec![b'X'; 120 * 1024];
let socket = UdpSocket::bind("0.0.0.0:0").await.unwrap();
let mut codec = BridgeCodec::new(client_cfg.max_bytes as u64);
let plan = UdpClientPlan::new_with_payload(&client_cfg, Bytes::from(payload.clone()), 4096).unwrap();
let mut frames = plan.frames.into_iter();
let hello = frames.next().unwrap();
sleep(StdDuration::from_millis(50)).await; udp_send(&socket, &bind, hello.clone(), &mut codec).await.unwrap();
let mut hello_ok = false;
for _ in 0..6 {
if let Some((ack, _)) = udp_recv(&socket, &mut codec, HANDSHAKE_TIMEOUT).await.unwrap() {
if ack.header.kind == MsgKind::HelloAck {
hello_ok = true;
break;
}
}
sleep(StdDuration::from_millis(20)).await;
udp_send(&socket, &bind, hello.clone(), &mut codec).await.unwrap();
}
assert!(hello_ok, "hello ack missing");
let init = frames.find(|f| f.header.kind == MsgKind::SessionInit).unwrap();
udp_send(&socket, &bind, init, &mut codec).await.unwrap();
let data_frames: Vec<Frame> = frames.filter(|f| f.header.kind == MsgKind::Data).collect();
for f in &data_frames {
udp_send(&socket, &bind, f.clone(), &mut codec).await.unwrap();
}
let total = data_frames.len();
let mut acked = vec![false; total];
let mut attempts = 0u32;
while attempts < (total as u32 + 10) && acked.iter().any(|a| !*a) {
match udp_recv(&socket, &mut codec, HANDSHAKE_TIMEOUT).await.unwrap() {
Some((ackb, _)) if ackb.header.kind == MsgKind::AckBitmap => {
let ack_payload: AckBitmapPayload = deserialize_payload(&ackb.payload).unwrap();
let base = ack_payload.base_chunk as usize;
for i in 0..base.min(total) {
acked[i] = true;
}
for bit in 0..ack_payload.bitmap.len() * 8 {
let idx = base + bit;
if idx >= total {
break;
}
let byte = bit / 8;
let mask = 1u8 << (bit % 8);
if (ack_payload.bitmap[byte] & mask) != 0 {
acked[idx] = true;
}
}
}
_ => { }
}
for i in 0..total {
if !acked[i] {
udp_send(&socket, &bind, data_frames[i].clone(), &mut codec).await.unwrap();
}
}
attempts += 1;
}
if acked.iter().any(|a| !*a) {
eprintln!(
"warning: UDP client still missing chunks after retries: {:?}",
acked
.iter()
.enumerate()
.filter_map(|(i, ok)| (!*ok).then_some(i))
.collect::<Vec<_>>()
);
}
server.await.unwrap().unwrap();
let val = seen.lock().unwrap().clone().expect("clipboard should be set");
assert_eq!(val.len(), payload.len());
}
#[test]
fn udp_client_plan_builds_frames() {
let cfg = ResolvedBridgeConnect {
to: "127.0.0.1:8121".into(),
socket: std::path::PathBuf::from("/run/wsl-clip/socket"),
mode: crate::config::BridgeMode::Udp,
token: None,
token_file: None,
max_bytes: 64 * 1024,
};
let plan = UdpClientPlan::new(&cfg).unwrap();
assert!(plan.frames.len() >= 2);
assert_eq!(plan.frames[0].header.kind, MsgKind::Hello);
assert_eq!(plan.frames[1].header.kind, MsgKind::SessionInit);
assert!(plan.frames.iter().any(|f| f.header.kind == MsgKind::Data));
}
}