1use std::fmt;
2use std::io;
3use std::net::SocketAddr;
4use std::path::PathBuf;
5use std::sync::Arc;
6
7use rpc_runtime_codec_msgpack::{
8 CodecLimits, DEFAULT_MAX_MESSAGE_SIZE, decode_envelope, encode_envelope,
9};
10use rpc_runtime_core::Envelope;
11use rpc_runtime_errors::RuntimeErrorCode;
12use rpc_runtime_transport::{
13 ConnectionScope, EnvelopeReader, EnvelopeWriter, RpcConnection, RpcListener, RpcReceiver,
14 RpcSender, TransportFuture, is_local_socket_addr,
15};
16pub use rpc_runtime_transport::{
17 RpcReceiver as IpcReceiver, RpcSender as IpcSender, TransportError,
18};
19use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, split};
20use tokio::net::{TcpListener, TcpStream};
21use tokio::sync::Mutex;
22use tracing::{debug, trace, warn};
23
24#[cfg(unix)]
25use tokio::net::{UnixListener, UnixStream};
26
27#[cfg(windows)]
28use tokio::net::windows::named_pipe::{ClientOptions, NamedPipeServer, PipeMode, ServerOptions};
29
30pub const FRAME_LENGTH_PREFIX_SIZE: usize = 4;
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub struct FrameConfig {
34 pub max_frame_size: usize,
35}
36
37impl Default for FrameConfig {
38 fn default() -> Self {
39 Self {
40 max_frame_size: DEFAULT_MAX_MESSAGE_SIZE,
41 }
42 }
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub enum IpcEndpoint {
47 PlatformDefault { name: String },
48 NamedPipe { name: String },
49 UnixSocket { path: PathBuf },
50 Tcp { addr: SocketAddr },
51}
52
53impl IpcEndpoint {
54 pub fn platform_default(name: impl Into<String>) -> Self {
55 Self::PlatformDefault { name: name.into() }
56 }
57
58 pub fn tcp(addr: SocketAddr) -> Self {
59 Self::Tcp { addr }
60 }
61}
62
63pub struct IpcListener {
64 inner: ListenerInner,
65 config: FrameConfig,
66 connection_scope: ConnectionScope,
67}
68
69enum ListenerInner {
70 Tcp(TcpListener),
71 #[cfg(unix)]
72 Unix(UnixListener),
73 #[cfg(windows)]
74 NamedPipe {
75 path: String,
76 pending: Option<NamedPipeServer>,
77 },
78}
79
80impl fmt::Debug for IpcListener {
81 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
82 formatter
83 .debug_struct("IpcListener")
84 .field("config", &self.config)
85 .finish_non_exhaustive()
86 }
87}
88
89impl IpcListener {
90 pub async fn bind(endpoint: IpcEndpoint, config: FrameConfig) -> Result<Self, TransportError> {
91 match normalize_endpoint(endpoint)? {
92 IpcEndpoint::Tcp { addr } => {
93 let listener = TcpListener::bind(addr).await?;
94 Ok(Self {
95 inner: ListenerInner::Tcp(listener),
96 config,
97 connection_scope: ConnectionScope::default(),
98 })
99 }
100 #[cfg(unix)]
101 IpcEndpoint::UnixSocket { path } => {
102 let listener = UnixListener::bind(path)?;
103 Ok(Self {
104 inner: ListenerInner::Unix(listener),
105 config,
106 connection_scope: ConnectionScope::default(),
107 })
108 }
109 #[cfg(windows)]
110 IpcEndpoint::NamedPipe { name } => {
111 let path = named_pipe_path(&name);
112 let pending = create_named_pipe_server(&path)?;
113 Ok(Self {
114 inner: ListenerInner::NamedPipe {
115 path,
116 pending: Some(pending),
117 },
118 config,
119 connection_scope: ConnectionScope::default(),
120 })
121 }
122 #[allow(unreachable_patterns)]
123 endpoint => Err(TransportError::runtime(
124 RuntimeErrorCode::UnsupportedCapability,
125 format!("endpoint `{endpoint:?}` is not supported on this platform"),
126 )),
127 }
128 }
129
130 pub fn local_addr(&self) -> Option<SocketAddr> {
131 match &self.inner {
132 ListenerInner::Tcp(listener) => listener.local_addr().ok(),
133 #[cfg(unix)]
134 ListenerInner::Unix(_) => None,
135 #[cfg(windows)]
136 ListenerInner::NamedPipe { .. } => None,
137 }
138 }
139
140 pub async fn accept(&mut self) -> Result<IpcConnection, TransportError> {
141 match &mut self.inner {
142 ListenerInner::Tcp(listener) => {
143 let (stream, peer_addr) = listener.accept().await?;
144 validate_peer_scope(peer_addr, self.connection_scope)?;
145 Ok(IpcConnection::from_stream(stream, self.config))
146 }
147 #[cfg(unix)]
148 ListenerInner::Unix(listener) => {
149 let (stream, _) = listener.accept().await?;
150 Ok(IpcConnection::from_stream(stream, self.config))
151 }
152 #[cfg(windows)]
153 ListenerInner::NamedPipe { path, pending } => {
154 let server = pending.take().ok_or_else(|| {
155 io::Error::new(io::ErrorKind::NotConnected, "no pending named pipe server")
156 })?;
157 server.connect().await?;
158 *pending = Some(create_named_pipe_server(path)?);
159 Ok(IpcConnection::from_stream(server, self.config))
160 }
161 }
162 }
163}
164
165pub struct IpcConnection {
166 inner: RpcConnection,
167}
168
169impl IpcConnection {
170 pub async fn connect(
171 endpoint: IpcEndpoint,
172 config: FrameConfig,
173 ) -> Result<Self, TransportError> {
174 match normalize_endpoint(endpoint)? {
175 IpcEndpoint::Tcp { addr } => {
176 let stream = TcpStream::connect(addr).await?;
177 Ok(Self::from_stream(stream, config))
178 }
179 #[cfg(unix)]
180 IpcEndpoint::UnixSocket { path } => {
181 let stream = UnixStream::connect(path).await?;
182 Ok(Self::from_stream(stream, config))
183 }
184 #[cfg(windows)]
185 IpcEndpoint::NamedPipe { name } => {
186 let client = ClientOptions::new().open(named_pipe_path(&name))?;
187 Ok(Self::from_stream(client, config))
188 }
189 #[allow(unreachable_patterns)]
190 endpoint => Err(TransportError::runtime(
191 RuntimeErrorCode::UnsupportedCapability,
192 format!("endpoint `{endpoint:?}` is not supported on this platform"),
193 )),
194 }
195 }
196
197 pub fn from_stream<T>(stream: T, config: FrameConfig) -> Self
198 where
199 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
200 {
201 let (reader, writer) = split(stream);
202 let writer = Arc::new(IpcFrameWriter {
203 writer: Arc::new(Mutex::new(Box::new(writer))),
204 config,
205 });
206 let reader = Box::new(IpcFrameReader {
207 reader: Box::new(reader),
208 config,
209 });
210 Self {
211 inner: RpcConnection::new(RpcSender::new(writer), RpcReceiver::new(reader)),
212 }
213 }
214
215 pub fn split(self) -> (IpcSender, IpcReceiver) {
216 self.inner.split()
217 }
218
219 pub fn into_connection(self) -> RpcConnection {
220 self.inner
221 }
222}
223
224impl From<IpcConnection> for RpcConnection {
225 fn from(value: IpcConnection) -> Self {
226 value.into_connection()
227 }
228}
229
230impl RpcListener for IpcListener {
231 fn accept<'a>(&'a mut self) -> TransportFuture<'a, RpcConnection> {
232 Box::pin(async move { Ok(IpcListener::accept(self).await?.into_connection()) })
233 }
234
235 fn set_connection_scope(&mut self, scope: ConnectionScope) {
236 self.connection_scope = scope;
237 }
238}
239
240struct IpcFrameWriter {
241 writer: BoxedWriter,
242 config: FrameConfig,
243}
244
245impl EnvelopeWriter for IpcFrameWriter {
246 fn send_envelope<'a>(&'a self, envelope: &'a Envelope) -> TransportFuture<'a, ()> {
247 Box::pin(async move {
248 let bytes = encode_envelope(envelope)?;
249 let mut writer = self.writer.lock().await;
250 write_frame_bytes(&mut *writer, &bytes, self.config).await
251 })
252 }
253
254 fn shutdown<'a>(&'a self) -> TransportFuture<'a, ()> {
255 Box::pin(async move {
256 let mut writer = self.writer.lock().await;
257 writer.shutdown().await?;
258 Ok(())
259 })
260 }
261}
262
263struct IpcFrameReader {
264 reader: BoxedReader,
265 config: FrameConfig,
266}
267
268impl EnvelopeReader for IpcFrameReader {
269 fn recv_envelope<'a>(&'a mut self) -> TransportFuture<'a, Option<Envelope>> {
270 Box::pin(async move {
271 let Some(bytes) = read_frame_bytes(&mut *self.reader, self.config).await? else {
272 return Ok(None);
273 };
274 let envelope = match decode_envelope(
275 &bytes,
276 CodecLimits {
277 max_message_size: self.config.max_frame_size,
278 },
279 ) {
280 Ok(envelope) => envelope,
281 Err(err) => {
282 warn!(
283 frame_len = bytes.len(),
284 max_frame_size = self.config.max_frame_size,
285 error = %err,
286 "ipc transport failed to decode envelope"
287 );
288 return Err(err.into());
289 }
290 };
291 Ok(Some(envelope))
292 })
293 }
294}
295
296pub async fn write_frame_bytes<W>(
297 writer: &mut W,
298 payload: &[u8],
299 config: FrameConfig,
300) -> Result<(), TransportError>
301where
302 W: AsyncWrite + Unpin + ?Sized,
303{
304 validate_outbound_len(payload.len(), config)?;
305 trace!(
306 frame_len = payload.len(),
307 max_frame_size = config.max_frame_size,
308 "ipc transport writing frame"
309 );
310 writer
311 .write_all(&(payload.len() as u32).to_be_bytes())
312 .await?;
313 writer.write_all(payload).await?;
314 writer.flush().await?;
315 Ok(())
316}
317
318pub async fn read_frame_bytes<R>(
319 reader: &mut R,
320 config: FrameConfig,
321) -> Result<Option<Vec<u8>>, TransportError>
322where
323 R: AsyncRead + Unpin + ?Sized,
324{
325 let mut prefix = [0_u8; FRAME_LENGTH_PREFIX_SIZE];
326 match reader.read_exact(&mut prefix).await {
327 Ok(_) => {}
328 Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => {
329 debug!("ipc transport reached eof before frame prefix");
330 return Ok(None);
331 }
332 Err(err) => {
333 warn!(error = %err, "ipc transport failed to read frame prefix");
334 return Err(err.into());
335 }
336 }
337
338 let len = u32::from_be_bytes(prefix) as usize;
339 validate_inbound_len(len, config)?;
340 trace!(
341 frame_len = len,
342 max_frame_size = config.max_frame_size,
343 "ipc transport reading frame"
344 );
345 let mut payload = vec![0_u8; len];
346 reader.read_exact(&mut payload).await.map_err(|err| {
347 if err.kind() == io::ErrorKind::UnexpectedEof {
348 warn!(
349 frame_len = len,
350 error = %err,
351 "ipc transport peer disconnected before full frame payload was read"
352 );
353 io::Error::new(
354 io::ErrorKind::UnexpectedEof,
355 "peer disconnected before full frame payload was read",
356 )
357 } else {
358 warn!(
359 frame_len = len,
360 error = %err,
361 "ipc transport failed to read frame payload"
362 );
363 err
364 }
365 })?;
366 Ok(Some(payload))
367}
368
369fn validate_outbound_len(len: usize, config: FrameConfig) -> Result<(), TransportError> {
370 if len == 0 {
371 warn!("ipc transport rejected zero-length outbound frame");
372 return Err(TransportError::runtime(
373 RuntimeErrorCode::InvalidEnvelope,
374 "frame payload length must not be zero",
375 ));
376 }
377 if len > u32::MAX as usize {
378 warn!(
379 frame_len = len,
380 "ipc transport rejected outbound frame exceeding u32 range"
381 );
382 return Err(TransportError::runtime(
383 RuntimeErrorCode::InvalidEnvelope,
384 "frame payload length exceeds u32 range",
385 ));
386 }
387 if len > config.max_frame_size {
388 warn!(
389 frame_len = len,
390 max_frame_size = config.max_frame_size,
391 "ipc transport rejected oversized outbound frame"
392 );
393 return Err(TransportError::runtime(
394 RuntimeErrorCode::InvalidEnvelope,
395 format!(
396 "frame payload length {len} exceeds max frame size {}",
397 config.max_frame_size
398 ),
399 ));
400 }
401 validate_inbound_len(len, config)
402}
403
404fn validate_inbound_len(len: usize, config: FrameConfig) -> Result<(), TransportError> {
405 if len == 0 {
406 warn!("ipc transport rejected zero-length inbound frame");
407 return Err(TransportError::runtime(
408 RuntimeErrorCode::InvalidEnvelope,
409 "frame payload length must not be zero",
410 ));
411 }
412 if len > config.max_frame_size {
413 warn!(
414 frame_len = len,
415 max_frame_size = config.max_frame_size,
416 "ipc transport rejected oversized inbound frame"
417 );
418 return Err(TransportError::runtime(
419 RuntimeErrorCode::InvalidEnvelope,
420 format!(
421 "frame payload length {len} exceeds max frame size {}",
422 config.max_frame_size
423 ),
424 ));
425 }
426 Ok(())
427}
428
429fn validate_peer_scope(addr: SocketAddr, scope: ConnectionScope) -> Result<(), TransportError> {
430 if scope == ConnectionScope::LocalOnly && !is_local_socket_addr(&addr) {
431 warn!(
432 peer_addr = %addr,
433 "ipc transport rejected non-local TCP peer"
434 );
435 return Err(TransportError::runtime(
436 RuntimeErrorCode::AccessDenied,
437 "non-local TCP peer is not allowed",
438 ));
439 }
440 trace!(peer_addr = %addr, "ipc transport accepted TCP peer");
441 Ok(())
442}
443
444fn normalize_endpoint(endpoint: IpcEndpoint) -> Result<IpcEndpoint, TransportError> {
445 match endpoint {
446 IpcEndpoint::PlatformDefault { name } => {
447 #[cfg(windows)]
448 {
449 Ok(IpcEndpoint::NamedPipe { name })
450 }
451 #[cfg(unix)]
452 {
453 Ok(IpcEndpoint::UnixSocket {
454 path: PathBuf::from(format!("/tmp/tripley-rpc-{name}.sock")),
455 })
456 }
457 #[cfg(not(any(windows, unix)))]
458 {
459 let _ = name;
460 Err(TransportError::runtime(
461 RuntimeErrorCode::UnsupportedCapability,
462 "platform default IPC endpoint is not supported on this platform",
463 ))
464 }
465 }
466 endpoint => Ok(endpoint),
467 }
468}
469
470#[cfg(windows)]
471fn named_pipe_path(name: &str) -> String {
472 if name.starts_with(r"\\.\pipe\") {
473 name.to_string()
474 } else {
475 format!(r"\\.\pipe\tripley-rpc-{name}")
476 }
477}
478
479#[cfg(windows)]
480fn create_named_pipe_server(path: &str) -> io::Result<NamedPipeServer> {
481 ServerOptions::new()
482 .pipe_mode(PipeMode::Message)
483 .first_pipe_instance(false)
484 .create(path)
485}
486
487type BoxedReader = Box<dyn AsyncRead + Unpin + Send>;
488type BoxedWriter = Arc<Mutex<Box<dyn AsyncWrite + Unpin + Send>>>;
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493 use rmpv::Value;
494 use rpc_runtime_core::{InstanceId, MethodId, Request, RequestId, ResponseOk};
495 use tokio::io::duplex;
496
497 fn request_envelope(request_id: u64) -> Envelope {
498 Envelope::Request(Request {
499 request_id: RequestId::new(request_id),
500 instance_id: InstanceId::new(1).expect("non-zero instance id"),
501 method_id: MethodId::new(2),
502 payload: Value::Nil,
503 })
504 }
505
506 #[tokio::test]
507 async fn frame_bytes_roundtrip_over_duplex() {
508 let (mut client, mut server) = duplex(1024);
509 let payload = vec![1, 2, 3, 4];
510
511 let write = tokio::spawn(async move {
512 write_frame_bytes(&mut client, &payload, FrameConfig::default()).await
513 });
514 let read = read_frame_bytes(&mut server, FrameConfig::default())
515 .await
516 .expect("read frame");
517
518 write.await.expect("writer task").expect("write frame");
519 assert_eq!(read, Some(vec![1, 2, 3, 4]));
520 }
521
522 #[tokio::test]
523 async fn consecutive_frames_are_read_in_order() {
524 let (mut client, mut server) = duplex(1024);
525 write_frame_bytes(&mut client, &[1], FrameConfig::default())
526 .await
527 .expect("write first frame");
528 write_frame_bytes(&mut client, &[2, 3], FrameConfig::default())
529 .await
530 .expect("write second frame");
531
532 assert_eq!(
533 read_frame_bytes(&mut server, FrameConfig::default())
534 .await
535 .expect("read first"),
536 Some(vec![1])
537 );
538 assert_eq!(
539 read_frame_bytes(&mut server, FrameConfig::default())
540 .await
541 .expect("read second"),
542 Some(vec![2, 3])
543 );
544 }
545
546 #[tokio::test]
547 async fn zero_length_frame_fails() {
548 let (mut client, mut server) = duplex(1024);
549 client.write_all(&0_u32.to_be_bytes()).await.expect("write");
550
551 let err = read_frame_bytes(&mut server, FrameConfig::default())
552 .await
553 .expect_err("must fail");
554 assert_error_code(err, RuntimeErrorCode::InvalidEnvelope);
555 }
556
557 #[tokio::test]
558 async fn oversized_frame_fails_before_payload_allocation() {
559 let (mut client, mut server) = duplex(1024);
560 client.write_all(&8_u32.to_be_bytes()).await.expect("write");
561
562 let err = read_frame_bytes(&mut server, FrameConfig { max_frame_size: 4 })
563 .await
564 .expect_err("must fail");
565 assert_error_code(err, RuntimeErrorCode::InvalidEnvelope);
566 }
567
568 #[tokio::test]
569 async fn clean_eof_before_prefix_returns_none() {
570 let (client, mut server) = duplex(1024);
571 drop(client);
572
573 let frame = read_frame_bytes(&mut server, FrameConfig::default())
574 .await
575 .expect("read eof");
576 assert_eq!(frame, None);
577 }
578
579 #[tokio::test]
580 async fn truncated_payload_is_io_error() {
581 let (mut client, mut server) = duplex(1024);
582 client
583 .write_all(&4_u32.to_be_bytes())
584 .await
585 .expect("prefix");
586 client.write_all(&[1, 2]).await.expect("partial payload");
587 drop(client);
588
589 let err = read_frame_bytes(&mut server, FrameConfig::default())
590 .await
591 .expect_err("must fail");
592 assert!(matches!(err, TransportError::Io(_)));
593 }
594
595 #[tokio::test]
596 async fn tcp_loopback_echoes_envelopes() {
597 let mut listener = IpcListener::bind(
598 IpcEndpoint::Tcp {
599 addr: "127.0.0.1:0".parse().expect("loopback addr"),
600 },
601 FrameConfig::default(),
602 )
603 .await
604 .expect("bind tcp");
605 let addr = listener.local_addr().expect("local addr");
606
607 let server = tokio::spawn(async move {
608 let connection = listener.accept().await.expect("accept");
609 let (sender, mut receiver) = connection.split();
610 let envelope = receiver
611 .recv_envelope()
612 .await
613 .expect("receive request")
614 .expect("request envelope");
615 assert!(matches!(envelope, Envelope::Request(_)));
616 sender
617 .send_envelope(&Envelope::ResponseOk(ResponseOk {
618 request_id: RequestId::new(1),
619 payload: Value::Nil,
620 }))
621 .await
622 .expect("send response");
623 });
624
625 let connection = IpcConnection::connect(IpcEndpoint::Tcp { addr }, FrameConfig::default())
626 .await
627 .expect("connect tcp");
628 let (sender, mut receiver) = connection.split();
629 sender
630 .send_envelope(&request_envelope(1))
631 .await
632 .expect("send request");
633 let response = receiver
634 .recv_envelope()
635 .await
636 .expect("receive response")
637 .expect("response envelope");
638
639 assert!(matches!(response, Envelope::ResponseOk(_)));
640 server.await.expect("server task");
641 }
642
643 #[tokio::test]
644 async fn cloned_senders_serialize_concurrent_writes() {
645 let (client, server) = duplex(4096);
646 let client = IpcConnection::from_stream(client, FrameConfig::default());
647 let server = IpcConnection::from_stream(server, FrameConfig::default());
648 let (sender, _) = client.split();
649 let (_, mut receiver) = server.split();
650
651 let mut tasks = Vec::new();
652 for id in 1..=16 {
653 let sender = sender.clone();
654 tasks.push(tokio::spawn(async move {
655 sender
656 .send_envelope(&request_envelope(id))
657 .await
658 .expect("send request");
659 }));
660 }
661 for task in tasks {
662 task.await.expect("send task");
663 }
664
665 let mut ids = Vec::new();
666 for _ in 0..16 {
667 let Some(Envelope::Request(request)) =
668 receiver.recv_envelope().await.expect("receive request")
669 else {
670 panic!("expected request envelope");
671 };
672 ids.push(request.request_id.get());
673 }
674 ids.sort_unstable();
675 assert_eq!(ids, (1..=16).collect::<Vec<_>>());
676 }
677
678 #[cfg(windows)]
679 #[tokio::test]
680 async fn windows_named_pipe_echoes_envelopes() {
681 let name = unique_test_name("transport-test");
682 let mut listener = IpcListener::bind(
683 IpcEndpoint::NamedPipe { name: name.clone() },
684 FrameConfig::default(),
685 )
686 .await
687 .expect("bind named pipe");
688
689 let server = tokio::spawn(async move {
690 let connection = listener.accept().await.expect("accept");
691 let (sender, mut receiver) = connection.split();
692 receiver
693 .recv_envelope()
694 .await
695 .expect("receive")
696 .expect("envelope");
697 sender
698 .send_envelope(&Envelope::ResponseOk(ResponseOk {
699 request_id: RequestId::new(1),
700 payload: Value::Nil,
701 }))
702 .await
703 .expect("send");
704 });
705
706 let connection =
707 IpcConnection::connect(IpcEndpoint::NamedPipe { name }, FrameConfig::default())
708 .await
709 .expect("connect named pipe");
710 let (sender, mut receiver) = connection.split();
711 sender
712 .send_envelope(&request_envelope(1))
713 .await
714 .expect("send request");
715 assert!(matches!(
716 receiver
717 .recv_envelope()
718 .await
719 .expect("receive response")
720 .expect("response"),
721 Envelope::ResponseOk(_)
722 ));
723 server.await.expect("server task");
724 }
725
726 #[cfg(unix)]
727 #[tokio::test]
728 async fn unix_socket_echoes_envelopes() {
729 let path = std::env::temp_dir().join(format!("{}.sock", unique_test_name("tripley-rpc")));
730 let mut listener = IpcListener::bind(
731 IpcEndpoint::UnixSocket { path: path.clone() },
732 FrameConfig::default(),
733 )
734 .await
735 .expect("bind unix socket");
736
737 let server = tokio::spawn(async move {
738 let connection = listener.accept().await.expect("accept");
739 let (sender, mut receiver) = connection.split();
740 receiver
741 .recv_envelope()
742 .await
743 .expect("receive")
744 .expect("envelope");
745 sender
746 .send_envelope(&Envelope::ResponseOk(ResponseOk {
747 request_id: RequestId::new(1),
748 payload: Value::Nil,
749 }))
750 .await
751 .expect("send");
752 });
753
754 let connection = IpcConnection::connect(
755 IpcEndpoint::UnixSocket { path: path.clone() },
756 FrameConfig::default(),
757 )
758 .await
759 .expect("connect unix socket");
760 let (sender, mut receiver) = connection.split();
761 sender
762 .send_envelope(&request_envelope(1))
763 .await
764 .expect("send request");
765 assert!(matches!(
766 receiver
767 .recv_envelope()
768 .await
769 .expect("receive response")
770 .expect("response"),
771 Envelope::ResponseOk(_)
772 ));
773 server.await.expect("server task");
774 std::fs::remove_file(path).expect("remove unix socket");
775 }
776
777 fn assert_error_code(error: TransportError, code: RuntimeErrorCode) {
778 match error {
779 TransportError::Runtime(error) => assert_eq!(error.code, code),
780 TransportError::Io(error) => panic!("expected runtime error, got I/O error: {error}"),
781 }
782 }
783
784 #[test]
785 fn local_only_accepts_loopback_tcp_peer() {
786 validate_peer_scope(
787 "127.0.0.1:1000".parse().expect("addr"),
788 ConnectionScope::LocalOnly,
789 )
790 .expect("local peer");
791 }
792
793 #[test]
794 fn local_only_rejects_non_loopback_tcp_peer() {
795 let err = validate_peer_scope(
796 "192.0.2.10:1000".parse().expect("addr"),
797 ConnectionScope::LocalOnly,
798 )
799 .expect_err("must reject");
800
801 assert_error_code(err, RuntimeErrorCode::AccessDenied);
802 }
803
804 #[test]
805 fn remote_allowed_accepts_non_loopback_tcp_peer() {
806 validate_peer_scope(
807 "192.0.2.10:1000".parse().expect("addr"),
808 ConnectionScope::RemoteAllowed,
809 )
810 .expect("remote peer");
811 }
812
813 fn unique_test_name(prefix: &str) -> String {
814 let nanos = std::time::SystemTime::now()
815 .duration_since(std::time::UNIX_EPOCH)
816 .expect("system clock after unix epoch")
817 .as_nanos();
818 format!("{prefix}-{}-{nanos}", std::process::id())
819 }
820}