Skip to main content

zap/
transport.rs

1//! Transport implementations for ZAP
2//!
3//! Provides transport layer abstractions for ZAP protocol communication.
4//! Supports TCP, Unix sockets, WebSocket, and encrypted channels.
5//!
6//! # Example
7//!
8//! ```rust,ignore
9//! use zap::transport::{TcpTransport, connect};
10//!
11//! // Connect via TCP
12//! let transport = connect("zap://localhost:9999").await?;
13//! transport.send(b"hello").await?;
14//! let response = transport.recv().await?;
15//! ```
16
17use crate::error::{Error, Result};
18use std::pin::Pin;
19use std::future::Future;
20use std::sync::Arc;
21use tokio::io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
22use tokio::net::{TcpStream, TcpListener};
23use tokio::sync::Mutex;
24use url::Url;
25
26/// Frame header size (4 bytes for length)
27const FRAME_HEADER_SIZE: usize = 4;
28
29/// Maximum message size (16 MB)
30const MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024;
31
32/// Transport trait for ZAP connections
33pub trait Transport: Send + Sync {
34    /// Send a framed message
35    fn send(&self, data: &[u8]) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
36
37    /// Receive a framed message
38    fn recv(&self) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send + '_>>;
39
40    /// Close the transport
41    fn close(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
42
43    /// Check if transport is connected
44    fn is_connected(&self) -> bool;
45
46    /// Get local address if available
47    fn local_addr(&self) -> Option<String>;
48
49    /// Get peer address if available
50    fn peer_addr(&self) -> Option<String>;
51}
52
53/// Framed stream wrapper for length-prefixed messages
54struct FramedStream<S> {
55    reader: BufReader<tokio::io::ReadHalf<S>>,
56    writer: BufWriter<tokio::io::WriteHalf<S>>,
57}
58
59impl<S: AsyncRead + AsyncWrite + Unpin + Send + 'static> FramedStream<S> {
60    fn new(stream: S) -> Self {
61        let (read_half, write_half) = tokio::io::split(stream);
62        Self {
63            reader: BufReader::new(read_half),
64            writer: BufWriter::new(write_half),
65        }
66    }
67
68    async fn send(&mut self, data: &[u8]) -> Result<()> {
69        if data.len() > MAX_MESSAGE_SIZE {
70            return Err(Error::Transport(format!(
71                "message too large: {} > {}",
72                data.len(),
73                MAX_MESSAGE_SIZE
74            )));
75        }
76
77        // Write length prefix (big-endian)
78        let len = data.len() as u32;
79        self.writer.write_all(&len.to_be_bytes()).await?;
80
81        // Write data
82        self.writer.write_all(data).await?;
83        self.writer.flush().await?;
84
85        Ok(())
86    }
87
88    async fn recv(&mut self) -> Result<Vec<u8>> {
89        // Read length prefix
90        let mut len_buf = [0u8; FRAME_HEADER_SIZE];
91        self.reader.read_exact(&mut len_buf).await?;
92        let len = u32::from_be_bytes(len_buf) as usize;
93
94        if len > MAX_MESSAGE_SIZE {
95            return Err(Error::Transport(format!(
96                "message too large: {} > {}",
97                len, MAX_MESSAGE_SIZE
98            )));
99        }
100
101        // Read message
102        let mut data = vec![0u8; len];
103        self.reader.read_exact(&mut data).await?;
104
105        Ok(data)
106    }
107}
108
109/// TCP transport implementation
110pub struct TcpTransport {
111    stream: Arc<Mutex<Option<FramedStream<TcpStream>>>>,
112    local_addr: Option<String>,
113    peer_addr: Option<String>,
114}
115
116impl TcpTransport {
117    /// Connect to a TCP address
118    pub async fn connect(addr: &str) -> Result<Self> {
119        let stream = TcpStream::connect(addr).await?;
120        stream.set_nodelay(true)?;
121
122        let local_addr = stream.local_addr().ok().map(|a| a.to_string());
123        let peer_addr = stream.peer_addr().ok().map(|a| a.to_string());
124
125        let framed = FramedStream::new(stream);
126
127        Ok(Self {
128            stream: Arc::new(Mutex::new(Some(framed))),
129            local_addr,
130            peer_addr,
131        })
132    }
133
134    /// Create from existing stream (for server-side connections)
135    pub fn from_stream(stream: TcpStream) -> Self {
136        let local_addr = stream.local_addr().ok().map(|a| a.to_string());
137        let peer_addr = stream.peer_addr().ok().map(|a| a.to_string());
138        let framed = FramedStream::new(stream);
139
140        Self {
141            stream: Arc::new(Mutex::new(Some(framed))),
142            local_addr,
143            peer_addr,
144        }
145    }
146}
147
148impl Transport for TcpTransport {
149    fn send(&self, data: &[u8]) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
150        let data = data.to_vec();
151        Box::pin(async move {
152            let mut guard = self.stream.lock().await;
153            let stream = guard.as_mut()
154                .ok_or_else(|| Error::Transport("connection closed".into()))?;
155            stream.send(&data).await
156        })
157    }
158
159    fn recv(&self) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send + '_>> {
160        Box::pin(async move {
161            let mut guard = self.stream.lock().await;
162            let stream = guard.as_mut()
163                .ok_or_else(|| Error::Transport("connection closed".into()))?;
164            stream.recv().await
165        })
166    }
167
168    fn close(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
169        Box::pin(async move {
170            let mut guard = self.stream.lock().await;
171            *guard = None;
172            Ok(())
173        })
174    }
175
176    fn is_connected(&self) -> bool {
177        // Can't easily check without trying - just check if we have a stream
178        true
179    }
180
181    fn local_addr(&self) -> Option<String> {
182        self.local_addr.clone()
183    }
184
185    fn peer_addr(&self) -> Option<String> {
186        self.peer_addr.clone()
187    }
188}
189
190/// TCP listener for accepting connections
191pub struct TcpTransportListener {
192    listener: TcpListener,
193    local_addr: String,
194}
195
196impl TcpTransportListener {
197    /// Bind to a TCP address
198    pub async fn bind(addr: &str) -> Result<Self> {
199        let listener = TcpListener::bind(addr).await?;
200        let local_addr = listener.local_addr()?.to_string();
201
202        Ok(Self {
203            listener,
204            local_addr,
205        })
206    }
207
208    /// Accept a new connection
209    pub async fn accept(&self) -> Result<TcpTransport> {
210        let (stream, _addr) = self.listener.accept().await?;
211        stream.set_nodelay(true)?;
212        Ok(TcpTransport::from_stream(stream))
213    }
214
215    /// Get local address
216    pub fn local_addr(&self) -> &str {
217        &self.local_addr
218    }
219}
220
221#[cfg(unix)]
222mod unix_transport {
223    use super::*;
224    use tokio::net::{UnixStream, UnixListener};
225
226    /// Unix socket transport
227    pub struct UnixTransport {
228        stream: Arc<Mutex<Option<FramedStream<UnixStream>>>>,
229        local_addr: Option<String>,
230        peer_addr: Option<String>,
231    }
232
233    impl UnixTransport {
234        /// Connect to a Unix socket
235        pub async fn connect(path: &str) -> Result<Self> {
236            let stream = UnixStream::connect(path).await?;
237            let local_addr = stream.local_addr().ok()
238                .and_then(|a| a.as_pathname().map(|p| p.to_string_lossy().into_owned()));
239            let peer_addr = stream.peer_addr().ok()
240                .and_then(|a| a.as_pathname().map(|p| p.to_string_lossy().into_owned()));
241
242            let framed = FramedStream::new(stream);
243
244            Ok(Self {
245                stream: Arc::new(Mutex::new(Some(framed))),
246                local_addr,
247                peer_addr,
248            })
249        }
250
251        /// Create from existing stream
252        pub fn from_stream(stream: UnixStream) -> Self {
253            let local_addr = stream.local_addr().ok()
254                .and_then(|a| a.as_pathname().map(|p| p.to_string_lossy().into_owned()));
255            let peer_addr = stream.peer_addr().ok()
256                .and_then(|a| a.as_pathname().map(|p| p.to_string_lossy().into_owned()));
257            let framed = FramedStream::new(stream);
258
259            Self {
260                stream: Arc::new(Mutex::new(Some(framed))),
261                local_addr,
262                peer_addr,
263            }
264        }
265    }
266
267    impl Transport for UnixTransport {
268        fn send(&self, data: &[u8]) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
269            let data = data.to_vec();
270            Box::pin(async move {
271                let mut guard = self.stream.lock().await;
272                let stream = guard.as_mut()
273                    .ok_or_else(|| Error::Transport("connection closed".into()))?;
274                stream.send(&data).await
275            })
276        }
277
278        fn recv(&self) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send + '_>> {
279            Box::pin(async move {
280                let mut guard = self.stream.lock().await;
281                let stream = guard.as_mut()
282                    .ok_or_else(|| Error::Transport("connection closed".into()))?;
283                stream.recv().await
284            })
285        }
286
287        fn close(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
288            Box::pin(async move {
289                let mut guard = self.stream.lock().await;
290                *guard = None;
291                Ok(())
292            })
293        }
294
295        fn is_connected(&self) -> bool {
296            true
297        }
298
299        fn local_addr(&self) -> Option<String> {
300            self.local_addr.clone()
301        }
302
303        fn peer_addr(&self) -> Option<String> {
304            self.peer_addr.clone()
305        }
306    }
307
308    /// Unix socket listener
309    pub struct UnixTransportListener {
310        listener: UnixListener,
311        path: String,
312    }
313
314    impl UnixTransportListener {
315        /// Bind to a Unix socket path
316        pub async fn bind(path: &str) -> Result<Self> {
317            // Remove existing socket file if present
318            let _ = std::fs::remove_file(path);
319            let listener = UnixListener::bind(path)?;
320
321            Ok(Self {
322                listener,
323                path: path.to_string(),
324            })
325        }
326
327        /// Accept a new connection
328        pub async fn accept(&self) -> Result<UnixTransport> {
329            let (stream, _addr) = self.listener.accept().await?;
330            Ok(UnixTransport::from_stream(stream))
331        }
332
333        /// Get socket path
334        pub fn path(&self) -> &str {
335            &self.path
336        }
337    }
338
339    impl Drop for UnixTransportListener {
340        fn drop(&mut self) {
341            let _ = std::fs::remove_file(&self.path);
342        }
343    }
344}
345
346#[cfg(unix)]
347pub use unix_transport::{UnixTransport, UnixTransportListener};
348
349/// WebSocket transport
350pub struct WebSocketTransport {
351    ws: Arc<Mutex<Option<WebSocketStream>>>,
352    local_addr: Option<String>,
353    peer_addr: Option<String>,
354}
355
356struct WebSocketStream {
357    inner: tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<TcpStream>>,
358}
359
360impl WebSocketTransport {
361    /// Connect to a WebSocket URL
362    pub async fn connect(url: &str) -> Result<Self> {
363        use tokio_tungstenite::connect_async;
364
365        let (ws_stream, _response) = connect_async(url).await
366            .map_err(|e| Error::Transport(format!("WebSocket connect failed: {}", e)))?;
367
368        Ok(Self {
369            ws: Arc::new(Mutex::new(Some(WebSocketStream { inner: ws_stream }))),
370            local_addr: None,
371            peer_addr: Some(url.to_string()),
372        })
373    }
374}
375
376impl Transport for WebSocketTransport {
377    fn send(&self, data: &[u8]) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
378        use futures::SinkExt;
379        use tokio_tungstenite::tungstenite::Message;
380
381        let data = data.to_vec();
382        Box::pin(async move {
383            let mut guard = self.ws.lock().await;
384            let ws = guard.as_mut()
385                .ok_or_else(|| Error::Transport("connection closed".into()))?;
386            ws.inner.send(Message::Binary(data.into())).await
387                .map_err(|e| Error::Transport(format!("WebSocket send failed: {}", e)))
388        })
389    }
390
391    fn recv(&self) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send + '_>> {
392        use futures::StreamExt;
393        use tokio_tungstenite::tungstenite::Message;
394
395        Box::pin(async move {
396            let mut guard = self.ws.lock().await;
397            let ws = guard.as_mut()
398                .ok_or_else(|| Error::Transport("connection closed".into()))?;
399
400            loop {
401                match ws.inner.next().await {
402                    Some(Ok(Message::Binary(data))) => return Ok(data.to_vec()),
403                    Some(Ok(Message::Text(text))) => return Ok(text.into_bytes()),
404                    Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) => continue,
405                    Some(Ok(Message::Close(_))) => return Err(Error::Transport("connection closed".into())),
406                    Some(Ok(Message::Frame(_))) => continue,
407                    Some(Err(e)) => return Err(Error::Transport(format!("WebSocket recv failed: {}", e))),
408                    None => return Err(Error::Transport("connection closed".into())),
409                }
410            }
411        })
412    }
413
414    fn close(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
415        use futures::SinkExt;
416        use tokio_tungstenite::tungstenite::Message;
417
418        Box::pin(async move {
419            let mut guard = self.ws.lock().await;
420            if let Some(ws) = guard.as_mut() {
421                let _ = ws.inner.send(Message::Close(None)).await;
422            }
423            *guard = None;
424            Ok(())
425        })
426    }
427
428    fn is_connected(&self) -> bool {
429        true
430    }
431
432    fn local_addr(&self) -> Option<String> {
433        self.local_addr.clone()
434    }
435
436    fn peer_addr(&self) -> Option<String> {
437        self.peer_addr.clone()
438    }
439}
440
441/// UDP transport for fire-and-forget low-latency messaging
442///
443/// Provides unreliable datagram delivery with minimal overhead.
444/// Suitable for real-time applications where occasional packet loss is acceptable.
445pub struct UdpTransport {
446    socket: Arc<tokio::net::UdpSocket>,
447    peer_addr: Option<std::net::SocketAddr>,
448    local_addr: String,
449}
450
451impl UdpTransport {
452    /// Create a UDP transport bound to a local address
453    pub async fn bind(addr: &str) -> Result<Self> {
454        let socket = tokio::net::UdpSocket::bind(addr).await?;
455        let local_addr = socket.local_addr()?.to_string();
456
457        Ok(Self {
458            socket: Arc::new(socket),
459            peer_addr: None,
460            local_addr,
461        })
462    }
463
464    /// Connect to a remote UDP address (sets default destination)
465    pub async fn connect(local_addr: &str, peer_addr: &str) -> Result<Self> {
466        let socket = tokio::net::UdpSocket::bind(local_addr).await?;
467        let peer: std::net::SocketAddr = peer_addr.parse()
468            .map_err(|e| Error::Transport(format!("invalid peer address: {}", e)))?;
469        socket.connect(peer).await?;
470        let local = socket.local_addr()?.to_string();
471
472        Ok(Self {
473            socket: Arc::new(socket),
474            peer_addr: Some(peer),
475            local_addr: local,
476        })
477    }
478
479    /// Send a datagram to a specific address (connectionless)
480    pub async fn send_to(&self, data: &[u8], addr: &str) -> Result<()> {
481        let peer: std::net::SocketAddr = addr.parse()
482            .map_err(|e| Error::Transport(format!("invalid address: {}", e)))?;
483
484        if data.len() > MAX_MESSAGE_SIZE {
485            return Err(Error::Transport(format!(
486                "datagram too large: {} > {}",
487                data.len(),
488                MAX_MESSAGE_SIZE
489            )));
490        }
491
492        self.socket.send_to(data, peer).await?;
493        Ok(())
494    }
495
496    /// Receive a datagram with sender address
497    pub async fn recv_from(&self) -> Result<(Vec<u8>, std::net::SocketAddr)> {
498        let mut buf = vec![0u8; MAX_MESSAGE_SIZE];
499        let (len, addr) = self.socket.recv_from(&mut buf).await?;
500        buf.truncate(len);
501        Ok((buf, addr))
502    }
503}
504
505impl Transport for UdpTransport {
506    fn send(&self, data: &[u8]) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
507        let data = data.to_vec();
508        Box::pin(async move {
509            if data.len() > MAX_MESSAGE_SIZE {
510                return Err(Error::Transport(format!(
511                    "datagram too large: {} > {}",
512                    data.len(),
513                    MAX_MESSAGE_SIZE
514                )));
515            }
516
517            // For connected sockets, use send()
518            self.socket.send(&data).await?;
519            Ok(())
520        })
521    }
522
523    fn recv(&self) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send + '_>> {
524        Box::pin(async move {
525            let mut buf = vec![0u8; MAX_MESSAGE_SIZE];
526            let len = self.socket.recv(&mut buf).await?;
527            buf.truncate(len);
528            Ok(buf)
529        })
530    }
531
532    fn close(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
533        // UDP sockets don't need explicit close
534        Box::pin(async { Ok(()) })
535    }
536
537    fn is_connected(&self) -> bool {
538        self.peer_addr.is_some()
539    }
540
541    fn local_addr(&self) -> Option<String> {
542        Some(self.local_addr.clone())
543    }
544
545    fn peer_addr(&self) -> Option<String> {
546        self.peer_addr.map(|a| a.to_string())
547    }
548}
549
550/// Stdio transport for MCP subprocess servers
551///
552/// Spawns a subprocess and communicates via stdin/stdout with length-prefixed framing.
553pub struct StdioTransport {
554    child: Arc<Mutex<Option<tokio::process::Child>>>,
555    stdin: Arc<Mutex<Option<tokio::process::ChildStdin>>>,
556    stdout: Arc<Mutex<Option<BufReader<tokio::process::ChildStdout>>>>,
557    command: String,
558}
559
560impl StdioTransport {
561    /// Spawn a subprocess with the given command and arguments
562    pub async fn spawn(command: &str, args: &[&str]) -> Result<Self> {
563        use tokio::process::Command;
564
565        let mut child = Command::new(command)
566            .args(args)
567            .stdin(std::process::Stdio::piped())
568            .stdout(std::process::Stdio::piped())
569            .stderr(std::process::Stdio::inherit())
570            .spawn()
571            .map_err(|e| Error::Transport(format!("failed to spawn process: {}", e)))?;
572
573        let stdin = child.stdin.take()
574            .ok_or_else(|| Error::Transport("failed to capture stdin".into()))?;
575        let stdout = child.stdout.take()
576            .ok_or_else(|| Error::Transport("failed to capture stdout".into()))?;
577
578        Ok(Self {
579            child: Arc::new(Mutex::new(Some(child))),
580            stdin: Arc::new(Mutex::new(Some(stdin))),
581            stdout: Arc::new(Mutex::new(Some(BufReader::new(stdout)))),
582            command: command.to_string(),
583        })
584    }
585
586    /// Create from URL like "stdio:///path/to/binary?arg1&arg2"
587    pub async fn from_url(url: &Url) -> Result<Self> {
588        let command = url.path();
589        if command.is_empty() {
590            return Err(Error::Transport("stdio URL must specify command path".into()));
591        }
592
593        // Parse query parameters as arguments
594        let args: Vec<&str> = url.query()
595            .map(|q| q.split('&').collect())
596            .unwrap_or_default();
597
598        Self::spawn(command, &args).await
599    }
600}
601
602impl Transport for StdioTransport {
603    fn send(&self, data: &[u8]) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
604        let data = data.to_vec();
605        Box::pin(async move {
606            let mut guard = self.stdin.lock().await;
607            let stdin = guard.as_mut()
608                .ok_or_else(|| Error::Transport("stdin closed".into()))?;
609
610            if data.len() > MAX_MESSAGE_SIZE {
611                return Err(Error::Transport(format!(
612                    "message too large: {} > {}",
613                    data.len(),
614                    MAX_MESSAGE_SIZE
615                )));
616            }
617
618            // Write length prefix (big-endian)
619            let len = data.len() as u32;
620            stdin.write_all(&len.to_be_bytes()).await?;
621            stdin.write_all(&data).await?;
622            stdin.flush().await?;
623
624            Ok(())
625        })
626    }
627
628    fn recv(&self) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send + '_>> {
629        Box::pin(async move {
630            let mut guard = self.stdout.lock().await;
631            let stdout = guard.as_mut()
632                .ok_or_else(|| Error::Transport("stdout closed".into()))?;
633
634            // Read length prefix
635            let mut len_buf = [0u8; FRAME_HEADER_SIZE];
636            stdout.read_exact(&mut len_buf).await?;
637            let len = u32::from_be_bytes(len_buf) as usize;
638
639            if len > MAX_MESSAGE_SIZE {
640                return Err(Error::Transport(format!(
641                    "message too large: {} > {}",
642                    len, MAX_MESSAGE_SIZE
643                )));
644            }
645
646            // Read message
647            let mut data = vec![0u8; len];
648            stdout.read_exact(&mut data).await?;
649
650            Ok(data)
651        })
652    }
653
654    fn close(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
655        Box::pin(async move {
656            // Close stdin to signal subprocess
657            {
658                let mut guard = self.stdin.lock().await;
659                *guard = None;
660            }
661
662            // Wait for child to exit
663            let mut guard = self.child.lock().await;
664            if let Some(mut child) = guard.take() {
665                let _ = child.wait().await;
666            }
667
668            Ok(())
669        })
670    }
671
672    fn is_connected(&self) -> bool {
673        true
674    }
675
676    fn local_addr(&self) -> Option<String> {
677        Some(format!("stdio://{}", self.command))
678    }
679
680    fn peer_addr(&self) -> Option<String> {
681        Some(format!("stdio://{}", self.command))
682    }
683}
684
685/// HTTP/SSE transport for MCP remote servers
686///
687/// Uses HTTP POST for sending messages and Server-Sent Events for receiving.
688/// This is the standard transport for remote MCP servers.
689///
690/// Requires the `mcp` feature to be enabled.
691#[cfg(feature = "mcp")]
692pub struct HttpSseTransport {
693    client: reqwest::Client,
694    base_url: String,
695    recv_buffer: Arc<Mutex<Vec<Vec<u8>>>>,
696    connected: Arc<std::sync::atomic::AtomicBool>,
697}
698
699#[cfg(feature = "mcp")]
700impl HttpSseTransport {
701    /// Create a new HTTP/SSE transport for the given base URL
702    pub async fn connect(base_url: &str) -> Result<Self> {
703        let client = reqwest::Client::builder()
704            .timeout(std::time::Duration::from_secs(30))
705            .build()
706            .map_err(|e| Error::Transport(format!("failed to create HTTP client: {}", e)))?;
707
708        let transport = Self {
709            client,
710            base_url: base_url.trim_end_matches('/').to_string(),
711            recv_buffer: Arc::new(Mutex::new(Vec::new())),
712            connected: Arc::new(std::sync::atomic::AtomicBool::new(true)),
713        };
714
715        Ok(transport)
716    }
717
718    /// Start the SSE listener for receiving messages
719    pub async fn start_sse_listener(&self) -> Result<()> {
720        let url = format!("{}/sse", self.base_url);
721        let buffer = Arc::clone(&self.recv_buffer);
722        let connected = Arc::clone(&self.connected);
723        let client = self.client.clone();
724
725        tokio::spawn(async move {
726            loop {
727                if !connected.load(std::sync::atomic::Ordering::Relaxed) {
728                    break;
729                }
730
731                match client.get(&url).send().await {
732                    Ok(response) => {
733                        let mut stream = response.bytes_stream();
734                        use futures::StreamExt;
735
736                        let mut event_data = String::new();
737                        while let Some(chunk) = stream.next().await {
738                            match chunk {
739                                Ok(bytes) => {
740                                    let text = String::from_utf8_lossy(&bytes);
741                                    for line in text.lines() {
742                                        if line.starts_with("data: ") {
743                                            event_data.push_str(&line[6..]);
744                                        } else if line.is_empty() && !event_data.is_empty() {
745                                            // End of event
746                                            let mut guard = buffer.lock().await;
747                                            guard.push(event_data.as_bytes().to_vec());
748                                            event_data.clear();
749                                        }
750                                    }
751                                }
752                                Err(_) => break,
753                            }
754                        }
755                    }
756                    Err(_) => {
757                        // Reconnect after delay
758                        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
759                    }
760                }
761            }
762        });
763
764        Ok(())
765    }
766}
767
768#[cfg(feature = "mcp")]
769impl Transport for HttpSseTransport {
770    fn send(&self, data: &[u8]) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
771        let data = data.to_vec();
772        let client = self.client.clone();
773        let url = format!("{}/message", self.base_url);
774
775        Box::pin(async move {
776            client.post(&url)
777                .header("Content-Type", "application/json")
778                .body(data)
779                .send()
780                .await
781                .map_err(|e| Error::Transport(format!("HTTP POST failed: {}", e)))?
782                .error_for_status()
783                .map_err(|e| Error::Transport(format!("HTTP error: {}", e)))?;
784
785            Ok(())
786        })
787    }
788
789    fn recv(&self) -> Pin<Box<dyn Future<Output = Result<Vec<u8>>> + Send + '_>> {
790        let buffer = Arc::clone(&self.recv_buffer);
791
792        Box::pin(async move {
793            // Poll buffer for messages
794            loop {
795                {
796                    let mut guard = buffer.lock().await;
797                    if !guard.is_empty() {
798                        return Ok(guard.remove(0));
799                    }
800                }
801                // Small delay before checking again
802                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
803            }
804        })
805    }
806
807    fn close(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>> {
808        let connected = Arc::clone(&self.connected);
809        Box::pin(async move {
810            connected.store(false, std::sync::atomic::Ordering::Relaxed);
811            Ok(())
812        })
813    }
814
815    fn is_connected(&self) -> bool {
816        self.connected.load(std::sync::atomic::Ordering::Relaxed)
817    }
818
819    fn local_addr(&self) -> Option<String> {
820        None
821    }
822
823    fn peer_addr(&self) -> Option<String> {
824        Some(self.base_url.clone())
825    }
826}
827
828/// Create a transport from a URL
829///
830/// Supported URL schemes:
831/// - `zap://` or `zap+tcp://` or `tcp://` - TCP transport
832/// - `zap+unix://` or `unix://` - Unix socket transport (Unix only)
833/// - `ws://` or `wss://` - WebSocket transport
834/// - `stdio://` - Stdio transport (for MCP subprocess servers)
835/// - `http://` or `https://` - HTTP/SSE transport (requires `mcp` feature)
836/// - `udp://` - UDP transport (fire-and-forget, low-latency)
837pub async fn connect(url: &str) -> Result<Box<dyn Transport>> {
838    let parsed = Url::parse(url)?;
839
840    match parsed.scheme() {
841        "zap" | "zap+tcp" | "tcp" => {
842            let host = parsed.host_str().unwrap_or("localhost");
843            let port = parsed.port().unwrap_or(crate::DEFAULT_PORT);
844            let addr = format!("{}:{}", host, port);
845            let transport = TcpTransport::connect(&addr).await?;
846            Ok(Box::new(transport))
847        }
848        #[cfg(unix)]
849        "zap+unix" | "unix" => {
850            let path = parsed.path();
851            let transport = UnixTransport::connect(path).await?;
852            Ok(Box::new(transport))
853        }
854        "ws" | "wss" => {
855            let transport = WebSocketTransport::connect(url).await?;
856            Ok(Box::new(transport))
857        }
858        "stdio" => {
859            // Stdio transport for subprocess MCP servers
860            let transport = StdioTransport::from_url(&parsed).await?;
861            Ok(Box::new(transport))
862        }
863        #[cfg(feature = "mcp")]
864        "http" | "https" => {
865            // HTTP/SSE transport for remote MCP servers
866            let transport = HttpSseTransport::connect(url).await?;
867            transport.start_sse_listener().await?;
868            Ok(Box::new(transport))
869        }
870        #[cfg(not(feature = "mcp"))]
871        "http" | "https" => {
872            Err(Error::Transport(
873                "HTTP/SSE transport requires 'mcp' feature".into()
874            ))
875        }
876        "udp" => {
877            // UDP transport for low-latency fire-and-forget messaging
878            let host = parsed.host_str().unwrap_or("127.0.0.1");
879            let port = parsed.port().unwrap_or(crate::DEFAULT_PORT);
880            let peer_addr = format!("{}:{}", host, port);
881            let transport = UdpTransport::connect("0.0.0.0:0", &peer_addr).await?;
882            Ok(Box::new(transport))
883        }
884        _ => Err(Error::Transport(format!(
885            "unsupported URL scheme: {}",
886            parsed.scheme()
887        ))),
888    }
889}
890
891#[cfg(test)]
892mod tests {
893    use super::*;
894
895    #[tokio::test]
896    async fn test_tcp_transport_roundtrip() {
897        // Start a listener
898        let listener = TcpTransportListener::bind("127.0.0.1:0").await.unwrap();
899        let addr = listener.local_addr().to_string();
900
901        // Spawn server task
902        let server_task = tokio::spawn(async move {
903            let transport = listener.accept().await.unwrap();
904            let msg = transport.recv().await.unwrap();
905            transport.send(&msg).await.unwrap();
906        });
907
908        // Connect client
909        let client = TcpTransport::connect(&addr).await.unwrap();
910
911        // Send and receive
912        let test_msg = b"Hello, ZAP!";
913        client.send(test_msg).await.unwrap();
914        let response = client.recv().await.unwrap();
915
916        assert_eq!(response, test_msg);
917
918        // Cleanup
919        client.close().await.unwrap();
920        server_task.await.unwrap();
921    }
922
923    #[tokio::test]
924    async fn test_connect_tcp_url() {
925        // Just test URL parsing, not actual connection
926        let result = connect("zap://localhost:9999").await;
927        // Will fail to connect, but should parse URL correctly
928        assert!(result.is_err());
929    }
930
931    #[tokio::test]
932    async fn test_connect_invalid_scheme() {
933        let result = connect("ftp://localhost:9999").await;
934        assert!(result.is_err());
935        if let Err(Error::Transport(msg)) = result {
936            assert!(msg.contains("unsupported"));
937        }
938    }
939
940    #[cfg(unix)]
941    #[tokio::test]
942    async fn test_unix_transport_roundtrip() {
943        use std::env::temp_dir;
944
945        let socket_path = temp_dir().join(format!("zap_test_{}.sock", std::process::id()));
946        let socket_str = socket_path.to_str().unwrap().to_string();
947
948        // Start listener
949        let listener = UnixTransportListener::bind(&socket_str).await.unwrap();
950
951        // Spawn server
952        let server_socket = socket_str.clone();
953        let server_task = tokio::spawn(async move {
954            let transport = listener.accept().await.unwrap();
955            let msg = transport.recv().await.unwrap();
956            transport.send(&msg).await.unwrap();
957        });
958
959        // Give server time to start
960        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
961
962        // Connect client
963        let client = UnixTransport::connect(&socket_str).await.unwrap();
964
965        // Send and receive
966        let test_msg = b"Unix socket test!";
967        client.send(test_msg).await.unwrap();
968        let response = client.recv().await.unwrap();
969
970        assert_eq!(response, test_msg);
971
972        // Cleanup
973        client.close().await.unwrap();
974        server_task.await.unwrap();
975    }
976
977    #[tokio::test]
978    async fn test_udp_transport_roundtrip() {
979        // Bind server
980        let server = UdpTransport::bind("127.0.0.1:0").await.unwrap();
981        let server_addr = server.local_addr().unwrap();
982
983        // Connect client to server
984        let client = UdpTransport::connect("127.0.0.1:0", &server_addr).await.unwrap();
985        let client_addr = client.local_addr().unwrap();
986
987        // Client sends to server
988        let test_msg = b"UDP test message";
989        client.send(test_msg).await.unwrap();
990
991        // Server receives from client
992        let (received, sender) = server.recv_from().await.unwrap();
993        assert_eq!(&received, test_msg);
994        assert_eq!(sender.to_string(), client_addr);
995
996        // Server sends back to client
997        server.send_to(b"response", &client_addr).await.unwrap();
998
999        // Client receives response
1000        let (response, _) = client.recv_from().await.unwrap();
1001        assert_eq!(&response, b"response");
1002    }
1003
1004    #[tokio::test]
1005    async fn test_udp_transport_connected_mode() {
1006        // Bind receiver
1007        let receiver = UdpTransport::bind("127.0.0.1:0").await.unwrap();
1008        let recv_addr = receiver.local_addr().unwrap();
1009
1010        // Create connected sender
1011        let sender = UdpTransport::connect("127.0.0.1:0", &recv_addr).await.unwrap();
1012
1013        // Connected mode should report connected
1014        assert!(sender.is_connected());
1015
1016        // Bound-only mode is not "connected" (no default peer)
1017        assert!(!receiver.is_connected());
1018    }
1019
1020    #[tokio::test]
1021    async fn test_connect_udp_url() {
1022        // Test that UDP URL parsing works
1023        let result = connect("udp://127.0.0.1:5555").await;
1024        // Should succeed in creating transport (even if no server)
1025        assert!(result.is_ok());
1026
1027        let transport = result.unwrap();
1028        assert!(transport.is_connected());
1029        assert!(transport.peer_addr().is_some());
1030    }
1031}