Skip to main content

rpc_runtime_transport_ipc/
lib.rs

1use std::fmt;
2use std::io;
3use std::net::SocketAddr;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use rpc_runtime_codec_msgpack::{
8    CodecLimits, DEFAULT_MAX_MESSAGE_SIZE, decode_envelope, encode_envelope,
9};
10use rpc_runtime_core::Envelope;
11use rpc_runtime_errors::RuntimeErrorCode;
12use rpc_runtime_transport::{
13    ConnectionScope, EnvelopeReader, EnvelopeWriter, RpcConnection, RpcListener, RpcReceiver,
14    RpcSender, TransportFuture, is_local_socket_addr,
15};
16pub use rpc_runtime_transport::{
17    RpcReceiver as IpcReceiver, RpcSender as IpcSender, TransportError,
18};
19use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, split};
20use tokio::net::{TcpListener, TcpStream};
21use tokio::sync::Mutex;
22use tracing::{debug, trace, warn};
23
24#[cfg(unix)]
25use tokio::net::{UnixListener, UnixStream};
26
27#[cfg(windows)]
28use tokio::net::windows::named_pipe::{ClientOptions, NamedPipeServer, PipeMode, ServerOptions};
29
30pub const FRAME_LENGTH_PREFIX_SIZE: usize = 4;
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub struct FrameConfig {
34    pub max_frame_size: usize,
35}
36
37impl Default for FrameConfig {
38    fn default() -> Self {
39        Self {
40            max_frame_size: DEFAULT_MAX_MESSAGE_SIZE,
41        }
42    }
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub enum IpcEndpoint {
47    PlatformDefault { name: String },
48    NamedPipe { name: String },
49    UnixSocket { path: PathBuf },
50    Tcp { addr: SocketAddr },
51}
52
53impl IpcEndpoint {
54    pub fn platform_default(name: impl Into<String>) -> Self {
55        Self::PlatformDefault { name: name.into() }
56    }
57
58    pub fn tcp(addr: SocketAddr) -> Self {
59        Self::Tcp { addr }
60    }
61}
62
63pub struct IpcListener {
64    inner: ListenerInner,
65    config: FrameConfig,
66    connection_scope: ConnectionScope,
67}
68
69enum ListenerInner {
70    Tcp(TcpListener),
71    #[cfg(unix)]
72    Unix(UnixListener),
73    #[cfg(windows)]
74    NamedPipe {
75        path: String,
76        pending: Option<NamedPipeServer>,
77    },
78}
79
80impl fmt::Debug for IpcListener {
81    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
82        formatter
83            .debug_struct("IpcListener")
84            .field("config", &self.config)
85            .finish_non_exhaustive()
86    }
87}
88
89impl IpcListener {
90    pub async fn bind(endpoint: IpcEndpoint, config: FrameConfig) -> Result<Self, TransportError> {
91        match normalize_endpoint(endpoint)? {
92            IpcEndpoint::Tcp { addr } => {
93                let listener = TcpListener::bind(addr).await?;
94                Ok(Self {
95                    inner: ListenerInner::Tcp(listener),
96                    config,
97                    connection_scope: ConnectionScope::default(),
98                })
99            }
100            #[cfg(unix)]
101            IpcEndpoint::UnixSocket { path } => {
102                let listener = UnixListener::bind(path)?;
103                Ok(Self {
104                    inner: ListenerInner::Unix(listener),
105                    config,
106                    connection_scope: ConnectionScope::default(),
107                })
108            }
109            #[cfg(windows)]
110            IpcEndpoint::NamedPipe { name } => {
111                let path = named_pipe_path(&name);
112                let pending = create_named_pipe_server(&path)?;
113                Ok(Self {
114                    inner: ListenerInner::NamedPipe {
115                        path,
116                        pending: Some(pending),
117                    },
118                    config,
119                    connection_scope: ConnectionScope::default(),
120                })
121            }
122            #[allow(unreachable_patterns)]
123            endpoint => Err(TransportError::runtime(
124                RuntimeErrorCode::UnsupportedCapability,
125                format!("endpoint `{endpoint:?}` is not supported on this platform"),
126            )),
127        }
128    }
129
130    pub fn local_addr(&self) -> Option<SocketAddr> {
131        match &self.inner {
132            ListenerInner::Tcp(listener) => listener.local_addr().ok(),
133            #[cfg(unix)]
134            ListenerInner::Unix(_) => None,
135            #[cfg(windows)]
136            ListenerInner::NamedPipe { .. } => None,
137        }
138    }
139
140    pub async fn accept(&mut self) -> Result<IpcConnection, TransportError> {
141        match &mut self.inner {
142            ListenerInner::Tcp(listener) => {
143                let (stream, peer_addr) = listener.accept().await?;
144                validate_peer_scope(peer_addr, self.connection_scope)?;
145                Ok(IpcConnection::from_stream(stream, self.config))
146            }
147            #[cfg(unix)]
148            ListenerInner::Unix(listener) => {
149                let (stream, _) = listener.accept().await?;
150                Ok(IpcConnection::from_stream(stream, self.config))
151            }
152            #[cfg(windows)]
153            ListenerInner::NamedPipe { path, pending } => {
154                let server = pending.take().ok_or_else(|| {
155                    io::Error::new(io::ErrorKind::NotConnected, "no pending named pipe server")
156                })?;
157                server.connect().await?;
158                *pending = Some(create_named_pipe_server(path)?);
159                Ok(IpcConnection::from_stream(server, self.config))
160            }
161        }
162    }
163}
164
165pub struct IpcConnection {
166    inner: RpcConnection,
167}
168
169impl IpcConnection {
170    pub async fn connect(
171        endpoint: IpcEndpoint,
172        config: FrameConfig,
173    ) -> Result<Self, TransportError> {
174        match normalize_endpoint(endpoint)? {
175            IpcEndpoint::Tcp { addr } => {
176                let stream = TcpStream::connect(addr).await?;
177                Ok(Self::from_stream(stream, config))
178            }
179            #[cfg(unix)]
180            IpcEndpoint::UnixSocket { path } => {
181                let stream = UnixStream::connect(path).await?;
182                Ok(Self::from_stream(stream, config))
183            }
184            #[cfg(windows)]
185            IpcEndpoint::NamedPipe { name } => {
186                let client = ClientOptions::new().open(named_pipe_path(&name))?;
187                Ok(Self::from_stream(client, config))
188            }
189            #[allow(unreachable_patterns)]
190            endpoint => Err(TransportError::runtime(
191                RuntimeErrorCode::UnsupportedCapability,
192                format!("endpoint `{endpoint:?}` is not supported on this platform"),
193            )),
194        }
195    }
196
197    pub fn from_stream<T>(stream: T, config: FrameConfig) -> Self
198    where
199        T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
200    {
201        let (reader, writer) = split(stream);
202        let writer = Arc::new(IpcFrameWriter {
203            writer: Arc::new(Mutex::new(Box::new(writer))),
204            config,
205        });
206        let reader = Box::new(IpcFrameReader {
207            reader: Box::new(reader),
208            config,
209        });
210        Self {
211            inner: RpcConnection::new(RpcSender::new(writer), RpcReceiver::new(reader)),
212        }
213    }
214
215    pub fn split(self) -> (IpcSender, IpcReceiver) {
216        self.inner.split()
217    }
218
219    pub fn into_connection(self) -> RpcConnection {
220        self.inner
221    }
222}
223
224impl From<IpcConnection> for RpcConnection {
225    fn from(value: IpcConnection) -> Self {
226        value.into_connection()
227    }
228}
229
230impl RpcListener for IpcListener {
231    fn accept<'a>(&'a mut self) -> TransportFuture<'a, RpcConnection> {
232        Box::pin(async move { Ok(IpcListener::accept(self).await?.into_connection()) })
233    }
234
235    fn set_connection_scope(&mut self, scope: ConnectionScope) {
236        self.connection_scope = scope;
237    }
238}
239
240struct IpcFrameWriter {
241    writer: BoxedWriter,
242    config: FrameConfig,
243}
244
245impl EnvelopeWriter for IpcFrameWriter {
246    fn send_envelope<'a>(&'a self, envelope: &'a Envelope) -> TransportFuture<'a, ()> {
247        Box::pin(async move {
248            let bytes = encode_envelope(envelope)?;
249            let mut writer = self.writer.lock().await;
250            write_frame_bytes(&mut *writer, &bytes, self.config).await
251        })
252    }
253
254    fn shutdown<'a>(&'a self) -> TransportFuture<'a, ()> {
255        Box::pin(async move {
256            let mut writer = self.writer.lock().await;
257            writer.shutdown().await?;
258            Ok(())
259        })
260    }
261}
262
263struct IpcFrameReader {
264    reader: BoxedReader,
265    config: FrameConfig,
266}
267
268impl EnvelopeReader for IpcFrameReader {
269    fn recv_envelope<'a>(&'a mut self) -> TransportFuture<'a, Option<Envelope>> {
270        Box::pin(async move {
271            let Some(bytes) = read_frame_bytes(&mut *self.reader, self.config).await? else {
272                return Ok(None);
273            };
274            let envelope = match decode_envelope(
275                &bytes,
276                CodecLimits {
277                    max_message_size: self.config.max_frame_size,
278                },
279            ) {
280                Ok(envelope) => envelope,
281                Err(err) => {
282                    warn!(
283                        frame_len = bytes.len(),
284                        max_frame_size = self.config.max_frame_size,
285                        error = %err,
286                        "ipc transport failed to decode envelope"
287                    );
288                    return Err(err.into());
289                }
290            };
291            Ok(Some(envelope))
292        })
293    }
294}
295
296pub async fn write_frame_bytes<W>(
297    writer: &mut W,
298    payload: &[u8],
299    config: FrameConfig,
300) -> Result<(), TransportError>
301where
302    W: AsyncWrite + Unpin + ?Sized,
303{
304    validate_outbound_len(payload.len(), config)?;
305    trace!(
306        frame_len = payload.len(),
307        max_frame_size = config.max_frame_size,
308        "ipc transport writing frame"
309    );
310    writer
311        .write_all(&(payload.len() as u32).to_be_bytes())
312        .await?;
313    writer.write_all(payload).await?;
314    writer.flush().await?;
315    Ok(())
316}
317
318pub async fn read_frame_bytes<R>(
319    reader: &mut R,
320    config: FrameConfig,
321) -> Result<Option<Vec<u8>>, TransportError>
322where
323    R: AsyncRead + Unpin + ?Sized,
324{
325    let mut prefix = [0_u8; FRAME_LENGTH_PREFIX_SIZE];
326    match reader.read_exact(&mut prefix).await {
327        Ok(_) => {}
328        Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => {
329            debug!("ipc transport reached eof before frame prefix");
330            return Ok(None);
331        }
332        Err(err) => {
333            warn!(error = %err, "ipc transport failed to read frame prefix");
334            return Err(err.into());
335        }
336    }
337
338    let len = u32::from_be_bytes(prefix) as usize;
339    validate_inbound_len(len, config)?;
340    trace!(
341        frame_len = len,
342        max_frame_size = config.max_frame_size,
343        "ipc transport reading frame"
344    );
345    let mut payload = vec![0_u8; len];
346    reader.read_exact(&mut payload).await.map_err(|err| {
347        if err.kind() == io::ErrorKind::UnexpectedEof {
348            warn!(
349                frame_len = len,
350                error = %err,
351                "ipc transport peer disconnected before full frame payload was read"
352            );
353            io::Error::new(
354                io::ErrorKind::UnexpectedEof,
355                "peer disconnected before full frame payload was read",
356            )
357        } else {
358            warn!(
359                frame_len = len,
360                error = %err,
361                "ipc transport failed to read frame payload"
362            );
363            err
364        }
365    })?;
366    Ok(Some(payload))
367}
368
369fn validate_outbound_len(len: usize, config: FrameConfig) -> Result<(), TransportError> {
370    if len == 0 {
371        warn!("ipc transport rejected zero-length outbound frame");
372        return Err(TransportError::runtime(
373            RuntimeErrorCode::InvalidEnvelope,
374            "frame payload length must not be zero",
375        ));
376    }
377    if len > u32::MAX as usize {
378        warn!(
379            frame_len = len,
380            "ipc transport rejected outbound frame exceeding u32 range"
381        );
382        return Err(TransportError::runtime(
383            RuntimeErrorCode::InvalidEnvelope,
384            "frame payload length exceeds u32 range",
385        ));
386    }
387    if len > config.max_frame_size {
388        warn!(
389            frame_len = len,
390            max_frame_size = config.max_frame_size,
391            "ipc transport rejected oversized outbound frame"
392        );
393        return Err(TransportError::runtime(
394            RuntimeErrorCode::InvalidEnvelope,
395            format!(
396                "frame payload length {len} exceeds max frame size {}",
397                config.max_frame_size
398            ),
399        ));
400    }
401    validate_inbound_len(len, config)
402}
403
404fn validate_inbound_len(len: usize, config: FrameConfig) -> Result<(), TransportError> {
405    if len == 0 {
406        warn!("ipc transport rejected zero-length inbound frame");
407        return Err(TransportError::runtime(
408            RuntimeErrorCode::InvalidEnvelope,
409            "frame payload length must not be zero",
410        ));
411    }
412    if len > config.max_frame_size {
413        warn!(
414            frame_len = len,
415            max_frame_size = config.max_frame_size,
416            "ipc transport rejected oversized inbound frame"
417        );
418        return Err(TransportError::runtime(
419            RuntimeErrorCode::InvalidEnvelope,
420            format!(
421                "frame payload length {len} exceeds max frame size {}",
422                config.max_frame_size
423            ),
424        ));
425    }
426    Ok(())
427}
428
429fn validate_peer_scope(addr: SocketAddr, scope: ConnectionScope) -> Result<(), TransportError> {
430    if scope == ConnectionScope::LocalOnly && !is_local_socket_addr(&addr) {
431        warn!(
432            peer_addr = %addr,
433            "ipc transport rejected non-local TCP peer"
434        );
435        return Err(TransportError::runtime(
436            RuntimeErrorCode::AccessDenied,
437            "non-local TCP peer is not allowed",
438        ));
439    }
440    trace!(peer_addr = %addr, "ipc transport accepted TCP peer");
441    Ok(())
442}
443
444fn normalize_endpoint(endpoint: IpcEndpoint) -> Result<IpcEndpoint, TransportError> {
445    match endpoint {
446        IpcEndpoint::PlatformDefault { name } => {
447            #[cfg(windows)]
448            {
449                Ok(IpcEndpoint::NamedPipe { name })
450            }
451            #[cfg(unix)]
452            {
453                Ok(IpcEndpoint::UnixSocket {
454                    path: PathBuf::from(format!("/tmp/tripley-rpc-{name}.sock")),
455                })
456            }
457            #[cfg(not(any(windows, unix)))]
458            {
459                let _ = name;
460                Err(TransportError::runtime(
461                    RuntimeErrorCode::UnsupportedCapability,
462                    "platform default IPC endpoint is not supported on this platform",
463                ))
464            }
465        }
466        endpoint => Ok(endpoint),
467    }
468}
469
470#[cfg(windows)]
471fn named_pipe_path(name: &str) -> String {
472    if name.starts_with(r"\\.\pipe\") {
473        name.to_string()
474    } else {
475        format!(r"\\.\pipe\tripley-rpc-{name}")
476    }
477}
478
479#[cfg(windows)]
480fn create_named_pipe_server(path: &str) -> io::Result<NamedPipeServer> {
481    ServerOptions::new()
482        .pipe_mode(PipeMode::Message)
483        .first_pipe_instance(false)
484        .create(path)
485}
486
487type BoxedReader = Box<dyn AsyncRead + Unpin + Send>;
488type BoxedWriter = Arc<Mutex<Box<dyn AsyncWrite + Unpin + Send>>>;
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use rmpv::Value;
494    use rpc_runtime_core::{InstanceId, MethodId, Request, RequestId, ResponseOk};
495    use tokio::io::duplex;
496
497    fn request_envelope(request_id: u64) -> Envelope {
498        Envelope::Request(Request {
499            request_id: RequestId::new(request_id),
500            instance_id: InstanceId::new(1).expect("non-zero instance id"),
501            method_id: MethodId::new(2),
502            payload: Value::Nil,
503        })
504    }
505
506    #[tokio::test]
507    async fn frame_bytes_roundtrip_over_duplex() {
508        let (mut client, mut server) = duplex(1024);
509        let payload = vec![1, 2, 3, 4];
510
511        let write = tokio::spawn(async move {
512            write_frame_bytes(&mut client, &payload, FrameConfig::default()).await
513        });
514        let read = read_frame_bytes(&mut server, FrameConfig::default())
515            .await
516            .expect("read frame");
517
518        write.await.expect("writer task").expect("write frame");
519        assert_eq!(read, Some(vec![1, 2, 3, 4]));
520    }
521
522    #[tokio::test]
523    async fn consecutive_frames_are_read_in_order() {
524        let (mut client, mut server) = duplex(1024);
525        write_frame_bytes(&mut client, &[1], FrameConfig::default())
526            .await
527            .expect("write first frame");
528        write_frame_bytes(&mut client, &[2, 3], FrameConfig::default())
529            .await
530            .expect("write second frame");
531
532        assert_eq!(
533            read_frame_bytes(&mut server, FrameConfig::default())
534                .await
535                .expect("read first"),
536            Some(vec![1])
537        );
538        assert_eq!(
539            read_frame_bytes(&mut server, FrameConfig::default())
540                .await
541                .expect("read second"),
542            Some(vec![2, 3])
543        );
544    }
545
546    #[tokio::test]
547    async fn zero_length_frame_fails() {
548        let (mut client, mut server) = duplex(1024);
549        client.write_all(&0_u32.to_be_bytes()).await.expect("write");
550
551        let err = read_frame_bytes(&mut server, FrameConfig::default())
552            .await
553            .expect_err("must fail");
554        assert_error_code(err, RuntimeErrorCode::InvalidEnvelope);
555    }
556
557    #[tokio::test]
558    async fn oversized_frame_fails_before_payload_allocation() {
559        let (mut client, mut server) = duplex(1024);
560        client.write_all(&8_u32.to_be_bytes()).await.expect("write");
561
562        let err = read_frame_bytes(&mut server, FrameConfig { max_frame_size: 4 })
563            .await
564            .expect_err("must fail");
565        assert_error_code(err, RuntimeErrorCode::InvalidEnvelope);
566    }
567
568    #[tokio::test]
569    async fn clean_eof_before_prefix_returns_none() {
570        let (client, mut server) = duplex(1024);
571        drop(client);
572
573        let frame = read_frame_bytes(&mut server, FrameConfig::default())
574            .await
575            .expect("read eof");
576        assert_eq!(frame, None);
577    }
578
579    #[tokio::test]
580    async fn truncated_payload_is_io_error() {
581        let (mut client, mut server) = duplex(1024);
582        client
583            .write_all(&4_u32.to_be_bytes())
584            .await
585            .expect("prefix");
586        client.write_all(&[1, 2]).await.expect("partial payload");
587        drop(client);
588
589        let err = read_frame_bytes(&mut server, FrameConfig::default())
590            .await
591            .expect_err("must fail");
592        assert!(matches!(err, TransportError::Io(_)));
593    }
594
595    #[tokio::test]
596    async fn tcp_loopback_echoes_envelopes() {
597        let mut listener = IpcListener::bind(
598            IpcEndpoint::Tcp {
599                addr: "127.0.0.1:0".parse().expect("loopback addr"),
600            },
601            FrameConfig::default(),
602        )
603        .await
604        .expect("bind tcp");
605        let addr = listener.local_addr().expect("local addr");
606
607        let server = tokio::spawn(async move {
608            let connection = listener.accept().await.expect("accept");
609            let (sender, mut receiver) = connection.split();
610            let envelope = receiver
611                .recv_envelope()
612                .await
613                .expect("receive request")
614                .expect("request envelope");
615            assert!(matches!(envelope, Envelope::Request(_)));
616            sender
617                .send_envelope(&Envelope::ResponseOk(ResponseOk {
618                    request_id: RequestId::new(1),
619                    payload: Value::Nil,
620                }))
621                .await
622                .expect("send response");
623        });
624
625        let connection = IpcConnection::connect(IpcEndpoint::Tcp { addr }, FrameConfig::default())
626            .await
627            .expect("connect tcp");
628        let (sender, mut receiver) = connection.split();
629        sender
630            .send_envelope(&request_envelope(1))
631            .await
632            .expect("send request");
633        let response = receiver
634            .recv_envelope()
635            .await
636            .expect("receive response")
637            .expect("response envelope");
638
639        assert!(matches!(response, Envelope::ResponseOk(_)));
640        server.await.expect("server task");
641    }
642
643    #[tokio::test]
644    async fn cloned_senders_serialize_concurrent_writes() {
645        let (client, server) = duplex(4096);
646        let client = IpcConnection::from_stream(client, FrameConfig::default());
647        let server = IpcConnection::from_stream(server, FrameConfig::default());
648        let (sender, _) = client.split();
649        let (_, mut receiver) = server.split();
650
651        let mut tasks = Vec::new();
652        for id in 1..=16 {
653            let sender = sender.clone();
654            tasks.push(tokio::spawn(async move {
655                sender
656                    .send_envelope(&request_envelope(id))
657                    .await
658                    .expect("send request");
659            }));
660        }
661        for task in tasks {
662            task.await.expect("send task");
663        }
664
665        let mut ids = Vec::new();
666        for _ in 0..16 {
667            let Some(Envelope::Request(request)) =
668                receiver.recv_envelope().await.expect("receive request")
669            else {
670                panic!("expected request envelope");
671            };
672            ids.push(request.request_id.get());
673        }
674        ids.sort_unstable();
675        assert_eq!(ids, (1..=16).collect::<Vec<_>>());
676    }
677
678    #[cfg(windows)]
679    #[tokio::test]
680    async fn windows_named_pipe_echoes_envelopes() {
681        let name = unique_test_name("transport-test");
682        let mut listener = IpcListener::bind(
683            IpcEndpoint::NamedPipe { name: name.clone() },
684            FrameConfig::default(),
685        )
686        .await
687        .expect("bind named pipe");
688
689        let server = tokio::spawn(async move {
690            let connection = listener.accept().await.expect("accept");
691            let (sender, mut receiver) = connection.split();
692            receiver
693                .recv_envelope()
694                .await
695                .expect("receive")
696                .expect("envelope");
697            sender
698                .send_envelope(&Envelope::ResponseOk(ResponseOk {
699                    request_id: RequestId::new(1),
700                    payload: Value::Nil,
701                }))
702                .await
703                .expect("send");
704        });
705
706        let connection =
707            IpcConnection::connect(IpcEndpoint::NamedPipe { name }, FrameConfig::default())
708                .await
709                .expect("connect named pipe");
710        let (sender, mut receiver) = connection.split();
711        sender
712            .send_envelope(&request_envelope(1))
713            .await
714            .expect("send request");
715        assert!(matches!(
716            receiver
717                .recv_envelope()
718                .await
719                .expect("receive response")
720                .expect("response"),
721            Envelope::ResponseOk(_)
722        ));
723        server.await.expect("server task");
724    }
725
726    #[cfg(unix)]
727    #[tokio::test]
728    async fn unix_socket_echoes_envelopes() {
729        let path = std::env::temp_dir().join(format!("{}.sock", unique_test_name("tripley-rpc")));
730        let mut listener = IpcListener::bind(
731            IpcEndpoint::UnixSocket { path: path.clone() },
732            FrameConfig::default(),
733        )
734        .await
735        .expect("bind unix socket");
736
737        let server = tokio::spawn(async move {
738            let connection = listener.accept().await.expect("accept");
739            let (sender, mut receiver) = connection.split();
740            receiver
741                .recv_envelope()
742                .await
743                .expect("receive")
744                .expect("envelope");
745            sender
746                .send_envelope(&Envelope::ResponseOk(ResponseOk {
747                    request_id: RequestId::new(1),
748                    payload: Value::Nil,
749                }))
750                .await
751                .expect("send");
752        });
753
754        let connection = IpcConnection::connect(
755            IpcEndpoint::UnixSocket { path: path.clone() },
756            FrameConfig::default(),
757        )
758        .await
759        .expect("connect unix socket");
760        let (sender, mut receiver) = connection.split();
761        sender
762            .send_envelope(&request_envelope(1))
763            .await
764            .expect("send request");
765        assert!(matches!(
766            receiver
767                .recv_envelope()
768                .await
769                .expect("receive response")
770                .expect("response"),
771            Envelope::ResponseOk(_)
772        ));
773        server.await.expect("server task");
774        std::fs::remove_file(path).expect("remove unix socket");
775    }
776
777    fn assert_error_code(error: TransportError, code: RuntimeErrorCode) {
778        match error {
779            TransportError::Runtime(error) => assert_eq!(error.code, code),
780            TransportError::Io(error) => panic!("expected runtime error, got I/O error: {error}"),
781        }
782    }
783
784    #[test]
785    fn local_only_accepts_loopback_tcp_peer() {
786        validate_peer_scope(
787            "127.0.0.1:1000".parse().expect("addr"),
788            ConnectionScope::LocalOnly,
789        )
790        .expect("local peer");
791    }
792
793    #[test]
794    fn local_only_rejects_non_loopback_tcp_peer() {
795        let err = validate_peer_scope(
796            "192.0.2.10:1000".parse().expect("addr"),
797            ConnectionScope::LocalOnly,
798        )
799        .expect_err("must reject");
800
801        assert_error_code(err, RuntimeErrorCode::AccessDenied);
802    }
803
804    #[test]
805    fn remote_allowed_accepts_non_loopback_tcp_peer() {
806        validate_peer_scope(
807            "192.0.2.10:1000".parse().expect("addr"),
808            ConnectionScope::RemoteAllowed,
809        )
810        .expect("remote peer");
811    }
812
813    fn unique_test_name(prefix: &str) -> String {
814        let nanos = std::time::SystemTime::now()
815            .duration_since(std::time::UNIX_EPOCH)
816            .expect("system clock after unix epoch")
817            .as_nanos();
818        format!("{prefix}-{}-{nanos}", std::process::id())
819    }
820}