1use crate::error::{Error, Result};
18use std::pin::Pin;
19use std::future::Future;
20use std::sync::Arc;
21use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
22use tokio::net::{TcpStream, TcpListener};
23use tokio::sync::Mutex;
24use url::Url;
25
26const FRAME_HEADER_SIZE: usize = 4;
28
29const MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024;
31
32pub trait Transport: Send + Sync {
34 fn send(&self, data: &[u8]) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
36
37 fn recv(&self) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send + '_>>;
39
40 fn close(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
42
43 fn is_connected(&self) -> bool;
45
46 fn local_addr(&self) -> Option<String>;
48
49 fn peer_addr(&self) -> Option<String>;
51}
52
53struct FramedStream<S> {
55 reader: BufReader<tokio::io::ReadHalf<S>>,
56 writer: BufWriter<tokio::io::WriteHalf<S>>,
57}
58
59impl<S: AsyncRead + AsyncWrite + Unpin + Send + 'static> FramedStream<S> {
60 fn new(stream: S) -> Self {
61 let (read_half, write_half) = tokio::io::split(stream);
62 Self {
63 reader: BufReader::new(read_half),
64 writer: BufWriter::new(write_half),
65 }
66 }
67
68 async fn send(&mut self, data: &[u8]) -> Result<()> {
69 if data.len() > MAX_MESSAGE_SIZE {
70 return Err(Error::Transport(format!(
71 "message too large: {} > {}",
72 data.len(),
73 MAX_MESSAGE_SIZE
74 )));
75 }
76
77 let len = data.len() as u32;
79 self.writer.write_all(&len.to_be_bytes()).await?;
80
81 self.writer.write_all(data).await?;
83 self.writer.flush().await?;
84
85 Ok(())
86 }
87
88 async fn recv(&mut self) -> Result<Vec<u8>> {
89 let mut len_buf = [0u8; FRAME_HEADER_SIZE];
91 self.reader.read_exact(&mut len_buf).await?;
92 let len = u32::from_be_bytes(len_buf) as usize;
93
94 if len > MAX_MESSAGE_SIZE {
95 return Err(Error::Transport(format!(
96 "message too large: {} > {}",
97 len, MAX_MESSAGE_SIZE
98 )));
99 }
100
101 let mut data = vec![0u8; len];
103 self.reader.read_exact(&mut data).await?;
104
105 Ok(data)
106 }
107}
108
109pub struct TcpTransport {
111 stream: Arc<Mutex<Option<FramedStream<TcpStream>>>>,
112 local_addr: Option<String>,
113 peer_addr: Option<String>,
114}
115
116impl TcpTransport {
117 pub async fn connect(addr: &str) -> Result<Self> {
119 let stream = TcpStream::connect(addr).await?;
120 stream.set_nodelay(true)?;
121
122 let local_addr = stream.local_addr().ok().map(|a| a.to_string());
123 let peer_addr = stream.peer_addr().ok().map(|a| a.to_string());
124
125 let framed = FramedStream::new(stream);
126
127 Ok(Self {
128 stream: Arc::new(Mutex::new(Some(framed))),
129 local_addr,
130 peer_addr,
131 })
132 }
133
134 pub fn from_stream(stream: TcpStream) -> Self {
136 let local_addr = stream.local_addr().ok().map(|a| a.to_string());
137 let peer_addr = stream.peer_addr().ok().map(|a| a.to_string());
138 let framed = FramedStream::new(stream);
139
140 Self {
141 stream: Arc::new(Mutex::new(Some(framed))),
142 local_addr,
143 peer_addr,
144 }
145 }
146}
147
148impl Transport for TcpTransport {
149 fn send(&self, data: &[u8]) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
150 let data = data.to_vec();
151 Box::pin(async move {
152 let mut guard = self.stream.lock().await;
153 let stream = guard.as_mut()
154 .ok_or_else(|| Error::Transport("connection closed".into()))?;
155 stream.send(&data).await
156 })
157 }
158
159 fn recv(&self) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send + '_>> {
160 Box::pin(async move {
161 let mut guard = self.stream.lock().await;
162 let stream = guard.as_mut()
163 .ok_or_else(|| Error::Transport("connection closed".into()))?;
164 stream.recv().await
165 })
166 }
167
168 fn close(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
169 Box::pin(async move {
170 let mut guard = self.stream.lock().await;
171 *guard = None;
172 Ok(())
173 })
174 }
175
176 fn is_connected(&self) -> bool {
177 true
179 }
180
181 fn local_addr(&self) -> Option<String> {
182 self.local_addr.clone()
183 }
184
185 fn peer_addr(&self) -> Option<String> {
186 self.peer_addr.clone()
187 }
188}
189
190pub struct TcpTransportListener {
192 listener: TcpListener,
193 local_addr: String,
194}
195
196impl TcpTransportListener {
197 pub async fn bind(addr: &str) -> Result<Self> {
199 let listener = TcpListener::bind(addr).await?;
200 let local_addr = listener.local_addr()?.to_string();
201
202 Ok(Self {
203 listener,
204 local_addr,
205 })
206 }
207
208 pub async fn accept(&self) -> Result<TcpTransport> {
210 let (stream, _addr) = self.listener.accept().await?;
211 stream.set_nodelay(true)?;
212 Ok(TcpTransport::from_stream(stream))
213 }
214
215 pub fn local_addr(&self) -> &str {
217 &self.local_addr
218 }
219}
220
221#[cfg(unix)]
222mod unix_transport {
223 use super::*;
224 use tokio::net::{UnixStream, UnixListener};
225
226 pub struct UnixTransport {
228 stream: Arc<Mutex<Option<FramedStream<UnixStream>>>>,
229 local_addr: Option<String>,
230 peer_addr: Option<String>,
231 }
232
233 impl UnixTransport {
234 pub async fn connect(path: &str) -> Result<Self> {
236 let stream = UnixStream::connect(path).await?;
237 let local_addr = stream.local_addr().ok()
238 .and_then(|a| a.as_pathname().map(|p| p.to_string_lossy().into_owned()));
239 let peer_addr = stream.peer_addr().ok()
240 .and_then(|a| a.as_pathname().map(|p| p.to_string_lossy().into_owned()));
241
242 let framed = FramedStream::new(stream);
243
244 Ok(Self {
245 stream: Arc::new(Mutex::new(Some(framed))),
246 local_addr,
247 peer_addr,
248 })
249 }
250
251 pub fn from_stream(stream: UnixStream) -> Self {
253 let local_addr = stream.local_addr().ok()
254 .and_then(|a| a.as_pathname().map(|p| p.to_string_lossy().into_owned()));
255 let peer_addr = stream.peer_addr().ok()
256 .and_then(|a| a.as_pathname().map(|p| p.to_string_lossy().into_owned()));
257 let framed = FramedStream::new(stream);
258
259 Self {
260 stream: Arc::new(Mutex::new(Some(framed))),
261 local_addr,
262 peer_addr,
263 }
264 }
265 }
266
267 impl Transport for UnixTransport {
268 fn send(&self, data: &[u8]) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
269 let data = data.to_vec();
270 Box::pin(async move {
271 let mut guard = self.stream.lock().await;
272 let stream = guard.as_mut()
273 .ok_or_else(|| Error::Transport("connection closed".into()))?;
274 stream.send(&data).await
275 })
276 }
277
278 fn recv(&self) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send + '_>> {
279 Box::pin(async move {
280 let mut guard = self.stream.lock().await;
281 let stream = guard.as_mut()
282 .ok_or_else(|| Error::Transport("connection closed".into()))?;
283 stream.recv().await
284 })
285 }
286
287 fn close(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
288 Box::pin(async move {
289 let mut guard = self.stream.lock().await;
290 *guard = None;
291 Ok(())
292 })
293 }
294
295 fn is_connected(&self) -> bool {
296 true
297 }
298
299 fn local_addr(&self) -> Option<String> {
300 self.local_addr.clone()
301 }
302
303 fn peer_addr(&self) -> Option<String> {
304 self.peer_addr.clone()
305 }
306 }
307
308 pub struct UnixTransportListener {
310 listener: UnixListener,
311 path: String,
312 }
313
314 impl UnixTransportListener {
315 pub async fn bind(path: &str) -> Result<Self> {
317 let _ = std::fs::remove_file(path);
319 let listener = UnixListener::bind(path)?;
320
321 Ok(Self {
322 listener,
323 path: path.to_string(),
324 })
325 }
326
327 pub async fn accept(&self) -> Result<UnixTransport> {
329 let (stream, _addr) = self.listener.accept().await?;
330 Ok(UnixTransport::from_stream(stream))
331 }
332
333 pub fn path(&self) -> &str {
335 &self.path
336 }
337 }
338
339 impl Drop for UnixTransportListener {
340 fn drop(&mut self) {
341 let _ = std::fs::remove_file(&self.path);
342 }
343 }
344}
345
346#[cfg(unix)]
347pub use unix_transport::{UnixTransport, UnixTransportListener};
348
349pub struct WebSocketTransport {
351 ws: Arc<Mutex<Option<WebSocketStream>>>,
352 local_addr: Option<String>,
353 peer_addr: Option<String>,
354}
355
356struct WebSocketStream {
357 inner: tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<TcpStream>>,
358}
359
360impl WebSocketTransport {
361 pub async fn connect(url: &str) -> Result<Self> {
363 use tokio_tungstenite::connect_async;
364
365 let (ws_stream, _response) = connect_async(url).await
366 .map_err(|e| Error::Transport(format!("WebSocket connect failed: {}", e)))?;
367
368 Ok(Self {
369 ws: Arc::new(Mutex::new(Some(WebSocketStream { inner: ws_stream }))),
370 local_addr: None,
371 peer_addr: Some(url.to_string()),
372 })
373 }
374}
375
376impl Transport for WebSocketTransport {
377 fn send(&self, data: &[u8]) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
378 use futures::SinkExt;
379 use tokio_tungstenite::tungstenite::Message;
380
381 let data = data.to_vec();
382 Box::pin(async move {
383 let mut guard = self.ws.lock().await;
384 let ws = guard.as_mut()
385 .ok_or_else(|| Error::Transport("connection closed".into()))?;
386 ws.inner.send(Message::Binary(data.into())).await
387 .map_err(|e| Error::Transport(format!("WebSocket send failed: {}", e)))
388 })
389 }
390
391 fn recv(&self) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send + '_>> {
392 use futures::StreamExt;
393 use tokio_tungstenite::tungstenite::Message;
394
395 Box::pin(async move {
396 let mut guard = self.ws.lock().await;
397 let ws = guard.as_mut()
398 .ok_or_else(|| Error::Transport("connection closed".into()))?;
399
400 loop {
401 match ws.inner.next().await {
402 Some(Ok(Message::Binary(data))) => return Ok(data.to_vec()),
403 Some(Ok(Message::Text(text))) => return Ok(text.into_bytes()),
404 Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) => continue,
405 Some(Ok(Message::Close(_))) => return Err(Error::Transport("connection closed".into())),
406 Some(Ok(Message::Frame(_))) => continue,
407 Some(Err(e)) => return Err(Error::Transport(format!("WebSocket recv failed: {}", e))),
408 None => return Err(Error::Transport("connection closed".into())),
409 }
410 }
411 })
412 }
413
414 fn close(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
415 use futures::SinkExt;
416 use tokio_tungstenite::tungstenite::Message;
417
418 Box::pin(async move {
419 let mut guard = self.ws.lock().await;
420 if let Some(ws) = guard.as_mut() {
421 let _ = ws.inner.send(Message::Close(None)).await;
422 }
423 *guard = None;
424 Ok(())
425 })
426 }
427
428 fn is_connected(&self) -> bool {
429 true
430 }
431
432 fn local_addr(&self) -> Option<String> {
433 self.local_addr.clone()
434 }
435
436 fn peer_addr(&self) -> Option<String> {
437 self.peer_addr.clone()
438 }
439}
440
441pub struct UdpTransport {
446 socket: Arc<tokio::net::UdpSocket>,
447 peer_addr: Option<std::net::SocketAddr>,
448 local_addr: String,
449}
450
451impl UdpTransport {
452 pub async fn bind(addr: &str) -> Result<Self> {
454 let socket = tokio::net::UdpSocket::bind(addr).await?;
455 let local_addr = socket.local_addr()?.to_string();
456
457 Ok(Self {
458 socket: Arc::new(socket),
459 peer_addr: None,
460 local_addr,
461 })
462 }
463
464 pub async fn connect(local_addr: &str, peer_addr: &str) -> Result<Self> {
466 let socket = tokio::net::UdpSocket::bind(local_addr).await?;
467 let peer: std::net::SocketAddr = peer_addr.parse()
468 .map_err(|e| Error::Transport(format!("invalid peer address: {}", e)))?;
469 socket.connect(peer).await?;
470 let local = socket.local_addr()?.to_string();
471
472 Ok(Self {
473 socket: Arc::new(socket),
474 peer_addr: Some(peer),
475 local_addr: local,
476 })
477 }
478
479 pub async fn send_to(&self, data: &[u8], addr: &str) -> Result<()> {
481 let peer: std::net::SocketAddr = addr.parse()
482 .map_err(|e| Error::Transport(format!("invalid address: {}", e)))?;
483
484 if data.len() > MAX_MESSAGE_SIZE {
485 return Err(Error::Transport(format!(
486 "datagram too large: {} > {}",
487 data.len(),
488 MAX_MESSAGE_SIZE
489 )));
490 }
491
492 self.socket.send_to(data, peer).await?;
493 Ok(())
494 }
495
496 pub async fn recv_from(&self) -> Result<(Vec<u8>, std::net::SocketAddr)> {
498 let mut buf = vec![0u8; MAX_MESSAGE_SIZE];
499 let (len, addr) = self.socket.recv_from(&mut buf).await?;
500 buf.truncate(len);
501 Ok((buf, addr))
502 }
503}
504
505impl Transport for UdpTransport {
506 fn send(&self, data: &[u8]) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
507 let data = data.to_vec();
508 Box::pin(async move {
509 if data.len() > MAX_MESSAGE_SIZE {
510 return Err(Error::Transport(format!(
511 "datagram too large: {} > {}",
512 data.len(),
513 MAX_MESSAGE_SIZE
514 )));
515 }
516
517 self.socket.send(&data).await?;
519 Ok(())
520 })
521 }
522
523 fn recv(&self) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send + '_>> {
524 Box::pin(async move {
525 let mut buf = vec![0u8; MAX_MESSAGE_SIZE];
526 let len = self.socket.recv(&mut buf).await?;
527 buf.truncate(len);
528 Ok(buf)
529 })
530 }
531
532 fn close(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
533 Box::pin(async { Ok(()) })
535 }
536
537 fn is_connected(&self) -> bool {
538 self.peer_addr.is_some()
539 }
540
541 fn local_addr(&self) -> Option<String> {
542 Some(self.local_addr.clone())
543 }
544
545 fn peer_addr(&self) -> Option<String> {
546 self.peer_addr.map(|a| a.to_string())
547 }
548}
549
550pub struct StdioTransport {
554 child: Arc<Mutex<Option<tokio::process::Child>>>,
555 stdin: Arc<Mutex<Option<tokio::process::ChildStdin>>>,
556 stdout: Arc<Mutex<Option<BufReader<tokio::process::ChildStdout>>>>,
557 command: String,
558}
559
560impl StdioTransport {
561 pub async fn spawn(command: &str, args: &[&str]) -> Result<Self> {
563 use tokio::process::Command;
564
565 let mut child = Command::new(command)
566 .args(args)
567 .stdin(std::process::Stdio::piped())
568 .stdout(std::process::Stdio::piped())
569 .stderr(std::process::Stdio::inherit())
570 .spawn()
571 .map_err(|e| Error::Transport(format!("failed to spawn process: {}", e)))?;
572
573 let stdin = child.stdin.take()
574 .ok_or_else(|| Error::Transport("failed to capture stdin".into()))?;
575 let stdout = child.stdout.take()
576 .ok_or_else(|| Error::Transport("failed to capture stdout".into()))?;
577
578 Ok(Self {
579 child: Arc::new(Mutex::new(Some(child))),
580 stdin: Arc::new(Mutex::new(Some(stdin))),
581 stdout: Arc::new(Mutex::new(Some(BufReader::new(stdout)))),
582 command: command.to_string(),
583 })
584 }
585
586 pub async fn from_url(url: &Url) -> Result<Self> {
588 let command = url.path();
589 if command.is_empty() {
590 return Err(Error::Transport("stdio URL must specify command path".into()));
591 }
592
593 let args: Vec<&str> = url.query()
595 .map(|q| q.split('&').collect())
596 .unwrap_or_default();
597
598 Self::spawn(command, &args).await
599 }
600}
601
602impl Transport for StdioTransport {
603 fn send(&self, data: &[u8]) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
604 let data = data.to_vec();
605 Box::pin(async move {
606 let mut guard = self.stdin.lock().await;
607 let stdin = guard.as_mut()
608 .ok_or_else(|| Error::Transport("stdin closed".into()))?;
609
610 if data.len() > MAX_MESSAGE_SIZE {
611 return Err(Error::Transport(format!(
612 "message too large: {} > {}",
613 data.len(),
614 MAX_MESSAGE_SIZE
615 )));
616 }
617
618 let len = data.len() as u32;
620 stdin.write_all(&len.to_be_bytes()).await?;
621 stdin.write_all(&data).await?;
622 stdin.flush().await?;
623
624 Ok(())
625 })
626 }
627
628 fn recv(&self) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send + '_>> {
629 Box::pin(async move {
630 let mut guard = self.stdout.lock().await;
631 let stdout = guard.as_mut()
632 .ok_or_else(|| Error::Transport("stdout closed".into()))?;
633
634 let mut len_buf = [0u8; FRAME_HEADER_SIZE];
636 stdout.read_exact(&mut len_buf).await?;
637 let len = u32::from_be_bytes(len_buf) as usize;
638
639 if len > MAX_MESSAGE_SIZE {
640 return Err(Error::Transport(format!(
641 "message too large: {} > {}",
642 len, MAX_MESSAGE_SIZE
643 )));
644 }
645
646 let mut data = vec![0u8; len];
648 stdout.read_exact(&mut data).await?;
649
650 Ok(data)
651 })
652 }
653
654 fn close(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
655 Box::pin(async move {
656 {
658 let mut guard = self.stdin.lock().await;
659 *guard = None;
660 }
661
662 let mut guard = self.child.lock().await;
664 if let Some(mut child) = guard.take() {
665 let _ = child.wait().await;
666 }
667
668 Ok(())
669 })
670 }
671
672 fn is_connected(&self) -> bool {
673 true
674 }
675
676 fn local_addr(&self) -> Option<String> {
677 Some(format!("stdio://{}", self.command))
678 }
679
680 fn peer_addr(&self) -> Option<String> {
681 Some(format!("stdio://{}", self.command))
682 }
683}
684
685#[cfg(feature = "mcp")]
692pub struct HttpSseTransport {
693 client: reqwest::Client,
694 base_url: String,
695 recv_buffer: Arc<Mutex<Vec<Vec<u8>>>>,
696 connected: Arc<std::sync::atomic::AtomicBool>,
697}
698
699#[cfg(feature = "mcp")]
700impl HttpSseTransport {
701 pub async fn connect(base_url: &str) -> Result<Self> {
703 let client = reqwest::Client::builder()
704 .timeout(std::time::Duration::from_secs(30))
705 .build()
706 .map_err(|e| Error::Transport(format!("failed to create HTTP client: {}", e)))?;
707
708 let transport = Self {
709 client,
710 base_url: base_url.trim_end_matches('/').to_string(),
711 recv_buffer: Arc::new(Mutex::new(Vec::new())),
712 connected: Arc::new(std::sync::atomic::AtomicBool::new(true)),
713 };
714
715 Ok(transport)
716 }
717
718 pub async fn start_sse_listener(&self) -> Result<()> {
720 let url = format!("{}/sse", self.base_url);
721 let buffer = Arc::clone(&self.recv_buffer);
722 let connected = Arc::clone(&self.connected);
723 let client = self.client.clone();
724
725 tokio::spawn(async move {
726 loop {
727 if !connected.load(std::sync::atomic::Ordering::Relaxed) {
728 break;
729 }
730
731 match client.get(&url).send().await {
732 Ok(response) => {
733 let mut stream = response.bytes_stream();
734 use futures::StreamExt;
735
736 let mut event_data = String::new();
737 while let Some(chunk) = stream.next().await {
738 match chunk {
739 Ok(bytes) => {
740 let text = String::from_utf8_lossy(&bytes);
741 for line in text.lines() {
742 if line.starts_with("data: ") {
743 event_data.push_str(&line[6..]);
744 } else if line.is_empty() && !event_data.is_empty() {
745 let mut guard = buffer.lock().await;
747 guard.push(event_data.as_bytes().to_vec());
748 event_data.clear();
749 }
750 }
751 }
752 Err(_) => break,
753 }
754 }
755 }
756 Err(_) => {
757 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
759 }
760 }
761 }
762 });
763
764 Ok(())
765 }
766}
767
768#[cfg(feature = "mcp")]
769impl Transport for HttpSseTransport {
770 fn send(&self, data: &[u8]) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
771 let data = data.to_vec();
772 let client = self.client.clone();
773 let url = format!("{}/message", self.base_url);
774
775 Box::pin(async move {
776 client.post(&url)
777 .header("Content-Type", "application/json")
778 .body(data)
779 .send()
780 .await
781 .map_err(|e| Error::Transport(format!("HTTP POST failed: {}", e)))?
782 .error_for_status()
783 .map_err(|e| Error::Transport(format!("HTTP error: {}", e)))?;
784
785 Ok(())
786 })
787 }
788
789 fn recv(&self) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send + '_>> {
790 let buffer = Arc::clone(&self.recv_buffer);
791
792 Box::pin(async move {
793 loop {
795 {
796 let mut guard = buffer.lock().await;
797 if !guard.is_empty() {
798 return Ok(guard.remove(0));
799 }
800 }
801 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
803 }
804 })
805 }
806
807 fn close(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
808 let connected = Arc::clone(&self.connected);
809 Box::pin(async move {
810 connected.store(false, std::sync::atomic::Ordering::Relaxed);
811 Ok(())
812 })
813 }
814
815 fn is_connected(&self) -> bool {
816 self.connected.load(std::sync::atomic::Ordering::Relaxed)
817 }
818
819 fn local_addr(&self) -> Option<String> {
820 None
821 }
822
823 fn peer_addr(&self) -> Option<String> {
824 Some(self.base_url.clone())
825 }
826}
827
828pub async fn connect(url: &str) -> Result<Box<dyn Transport>> {
838 let parsed = Url::parse(url)?;
839
840 match parsed.scheme() {
841 "zap" | "zap+tcp" | "tcp" => {
842 let host = parsed.host_str().unwrap_or("localhost");
843 let port = parsed.port().unwrap_or(crate::DEFAULT_PORT);
844 let addr = format!("{}:{}", host, port);
845 let transport = TcpTransport::connect(&addr).await?;
846 Ok(Box::new(transport))
847 }
848 #[cfg(unix)]
849 "zap+unix" | "unix" => {
850 let path = parsed.path();
851 let transport = UnixTransport::connect(path).await?;
852 Ok(Box::new(transport))
853 }
854 "ws" | "wss" => {
855 let transport = WebSocketTransport::connect(url).await?;
856 Ok(Box::new(transport))
857 }
858 "stdio" => {
859 let transport = StdioTransport::from_url(&parsed).await?;
861 Ok(Box::new(transport))
862 }
863 #[cfg(feature = "mcp")]
864 "http" | "https" => {
865 let transport = HttpSseTransport::connect(url).await?;
867 transport.start_sse_listener().await?;
868 Ok(Box::new(transport))
869 }
870 #[cfg(not(feature = "mcp"))]
871 "http" | "https" => {
872 Err(Error::Transport(
873 "HTTP/SSE transport requires 'mcp' feature".into()
874 ))
875 }
876 "udp" => {
877 let host = parsed.host_str().unwrap_or("127.0.0.1");
879 let port = parsed.port().unwrap_or(crate::DEFAULT_PORT);
880 let peer_addr = format!("{}:{}", host, port);
881 let transport = UdpTransport::connect("0.0.0.0:0", &peer_addr).await?;
882 Ok(Box::new(transport))
883 }
884 _ => Err(Error::Transport(format!(
885 "unsupported URL scheme: {}",
886 parsed.scheme()
887 ))),
888 }
889}
890
891#[cfg(test)]
892mod tests {
893 use super::*;
894
895 #[tokio::test]
896 async fn test_tcp_transport_roundtrip() {
897 let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
899 let addr = listener.local_addr().to_string();
900
901 let server_task = tokio::spawn(async move {
903 let transport = listener.accept().await.unwrap();
904 let msg = transport.recv().await.unwrap();
905 transport.send(&msg).await.unwrap();
906 });
907
908 let client = TcpTransport::connect(&addr).await.unwrap();
910
911 let test_msg = b"Hello, ZAP!";
913 client.send(test_msg).await.unwrap();
914 let response = client.recv().await.unwrap();
915
916 assert_eq!(response, test_msg);
917
918 client.close().await.unwrap();
920 server_task.await.unwrap();
921 }
922
923 #[tokio::test]
924 async fn test_connect_tcp_url() {
925 let result = connect("zap://localhost:9999").await;
927 assert!(result.is_err());
929 }
930
931 #[tokio::test]
932 async fn test_connect_invalid_scheme() {
933 let result = connect("ftp://localhost:9999").await;
934 assert!(result.is_err());
935 if let Err(Error::Transport(msg)) = result {
936 assert!(msg.contains("unsupported"));
937 }
938 }
939
940 #[cfg(unix)]
941 #[tokio::test]
942 async fn test_unix_transport_roundtrip() {
943 use std::env::temp_dir;
944
945 let socket_path = temp_dir().join(format!("zap_test_{}.sock", std::process::id()));
946 let socket_str = socket_path.to_str().unwrap().to_string();
947
948 let listener = UnixTransportListener::bind(&socket_str).await.unwrap();
950
951 let server_socket = socket_str.clone();
953 let server_task = tokio::spawn(async move {
954 let transport = listener.accept().await.unwrap();
955 let msg = transport.recv().await.unwrap();
956 transport.send(&msg).await.unwrap();
957 });
958
959 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
961
962 let client = UnixTransport::connect(&socket_str).await.unwrap();
964
965 let test_msg = b"Unix socket test!";
967 client.send(test_msg).await.unwrap();
968 let response = client.recv().await.unwrap();
969
970 assert_eq!(response, test_msg);
971
972 client.close().await.unwrap();
974 server_task.await.unwrap();
975 }
976
977 #[tokio::test]
978 async fn test_udp_transport_roundtrip() {
979 let server = UdpTransport::bind("127.0.0.1:0").await.unwrap();
981 let server_addr = server.local_addr().unwrap();
982
983 let client = UdpTransport::connect("127.0.0.1:0", &server_addr).await.unwrap();
985 let client_addr = client.local_addr().unwrap();
986
987 let test_msg = b"UDP test message";
989 client.send(test_msg).await.unwrap();
990
991 let (received, sender) = server.recv_from().await.unwrap();
993 assert_eq!(&received, test_msg);
994 assert_eq!(sender.to_string(), client_addr);
995
996 server.send_to(b"response", &client_addr).await.unwrap();
998
999 let (response, _) = client.recv_from().await.unwrap();
1001 assert_eq!(&response, b"response");
1002 }
1003
1004 #[tokio::test]
1005 async fn test_udp_transport_connected_mode() {
1006 let receiver = UdpTransport::bind("127.0.0.1:0").await.unwrap();
1008 let recv_addr = receiver.local_addr().unwrap();
1009
1010 let sender = UdpTransport::connect("127.0.0.1:0", &recv_addr).await.unwrap();
1012
1013 assert!(sender.is_connected());
1015
1016 assert!(!receiver.is_connected());
1018 }
1019
1020 #[tokio::test]
1021 async fn test_connect_udp_url() {
1022 let result = connect("udp://127.0.0.1:5555").await;
1024 assert!(result.is_ok());
1026
1027 let transport = result.unwrap();
1028 assert!(transport.is_connected());
1029 assert!(transport.peer_addr().is_some());
1030 }
1031}