fizyr_rpc/transport/
mod.rs

1//! Transport traits and concrete implementations.
2//!
3//! Transports are responsible for passing raw messages to a remote peer.
4//! They are used by the [`Peer`][crate::Peer] struct to implement higher level RPC communication.
5//!
6//! Specific transports must be enabled with individual feature flags.
7//! None of the concrete transport implementations are enabled by default.
8
9use std::future::Future;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13use crate::{Error, Message, MessageHeader};
14
15mod endian;
16pub use endian::Endian;
17
18pub(crate) mod stream;
19pub use stream::StreamTransport;
20
21#[cfg(feature = "tcp")]
22pub use stream::TcpStreamInfo;
23
24#[cfg(feature = "unix-stream")]
25pub use stream::UnixStreamInfo;
26
27pub(crate) mod unix;
28pub use unix::UnixTransport;
29
30#[cfg(feature = "unix-seqpacket")]
31pub use unix::UnixSeqpacketInfo;
32
33/// Trait for types that represent a bi-direction message transport.
34///
35/// Note that you can not use the transport itself directly.
36/// Instead, you must split it in a read and write half and use those.
37pub trait Transport: Send + 'static {
38	/// The body type for the messages.
39	type Body: crate::Body;
40
41	/// Information about the underlying stream or connection of the transport.
42	type Info: Clone + Send + 'static;
43
44	/// The configuration type for the transport.
45	type Config: Clone + Default + Send + Sync + 'static;
46
47	/// The type of the read half of the transport.
48	type ReadHalf<'a>: TransportReadHalf<Body = Self::Body> + 'a;
49
50	/// The type of the write half of the transport.
51	type WriteHalf<'a>: TransportWriteHalf<Body = Self::Body> + 'a;
52
53	/// Split the transport into a read half and a write half.
54	fn split(&mut self) -> (Self::ReadHalf<'_>, Self::WriteHalf<'_>);
55
56	/// Get information about the peer on the other end of the transport.
57	///
58	/// For TCP streams, this includes a socket address with an IP address and port number.
59	/// For Unix streams and seqpacket streams this includes the credentials of the remote process.
60	fn info(&self) -> std::io::Result<Self::Info>;
61}
62
63/// An error from the transport layer.
64///
65/// This is a regular [`crate::Error`],
66/// but also indicates if it is fatal for the transport or not.
67#[derive(Debug)]
68pub struct TransportError {
69	/// The actual error that occured.
70	inner: Error,
71
72	/// If true, the error was fatal and the transport is no longer usable.
73	is_fatal: bool,
74}
75
76impl TransportError {
77	/// Create a new fatal transport error from an inner error.
78	///
79	/// After a transport returns a fatal error, the transport should not be used anymore.
80	fn new_fatal(inner: impl Into<Error>) -> Self {
81		Self {
82			inner: inner.into(),
83			is_fatal: true,
84		}
85	}
86
87	/// Create a new non-fatal transport error from an inner error.
88	///
89	/// A transport may still be used after returning a non-fatal error.
90	fn new_non_fatal(inner: impl Into<Error>) -> Self {
91		Self {
92			inner: inner.into(),
93			is_fatal: false,
94		}
95	}
96
97	/// Get the inner error.
98	pub fn inner(&self) -> &Error {
99		&self.inner
100	}
101
102	/// Consume `self` to get the inner error.
103	pub fn into_inner(self) -> Error {
104		self.inner
105	}
106
107	/// Check if the error is fatal for the transport.
108	///
109	/// If the error is fatal,
110	/// the transport that generated it is no longer usable.
111	pub fn is_fatal(&self) -> bool {
112		self.is_fatal
113	}
114}
115
116impl std::error::Error for TransportError {}
117
118impl std::fmt::Display for TransportError {
119	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120		self.inner.fmt(f)
121	}
122}
123
124/// Trait for the read half of a transport type.
125pub trait TransportReadHalf: Send + Unpin {
126	/// The body type for messages transferred over the transport.
127	type Body: crate::Body;
128
129	/// Try to read a message from the transport without blocking.
130	///
131	/// This function may read partial messages into an internal buffer.
132	///
133	/// If the function returns [`Poll::Pending`],
134	/// the current task is scheduled to wake when more data is available.
135	fn poll_read_msg(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<Message<Self::Body>, TransportError>>;
136
137	/// Asynchronously read a complete message from the transport.
138	fn read_msg(&mut self) -> ReadMsg<Self>
139	where
140		Self: Unpin,
141	{
142		ReadMsg { inner: self }
143	}
144}
145
146/// Trait for transport types that you can write message to.
147pub trait TransportWriteHalf: Send + Unpin {
148	/// The body type for messages transferred over the transport.
149	type Body: crate::Body;
150
151	/// Try to write a message to the transport without blocking.
152	///
153	/// This function may write only part of the message.
154	/// The next invocation will skip the already written bytes.
155	///
156	/// It is an error to change the value of the `header` and `body` parameters between invocations
157	/// as long as the function returns [`Poll::Pending`].
158	/// An implementation may write spliced messages over the transport if you do.
159	/// It is allowed to *move* the header and body in between invocations though,
160	/// as long as the values remain the same.
161	///
162	/// If the function returns [`Poll::Pending`],
163	/// the current task is scheduled to wake when the transport is ready for more data.
164	fn poll_write_msg(self: Pin<&mut Self>, context: &mut Context, header: &MessageHeader, body: &Self::Body) -> Poll<Result<(), TransportError>>;
165
166	/// Asynchronously write a message to the transport.
167	fn write_msg<'c>(&'c mut self, header: &'c MessageHeader, body: &'c Self::Body) -> WriteMsg<Self> {
168		WriteMsg { inner: self, header, body }
169	}
170}
171
172/// Future type for [`TransportReadHalf::read_msg`].
173pub struct ReadMsg<'c, T>
174where
175	T: TransportReadHalf + ?Sized,
176{
177	inner: &'c mut T,
178}
179
180/// Future type for [`TransportWriteHalf::write_msg`].
181pub struct WriteMsg<'c, T>
182where
183	T: TransportWriteHalf + ?Sized,
184{
185	inner: &'c mut T,
186	header: &'c MessageHeader,
187	body: &'c T::Body,
188}
189
190impl<T> Future for ReadMsg<'_, T>
191where
192	T: TransportReadHalf + ?Sized + Unpin,
193{
194	type Output = Result<Message<T::Body>, TransportError>;
195
196	fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
197		Pin::new(&mut *self.get_mut().inner).poll_read_msg(cx)
198	}
199}
200
201impl<T> Future for WriteMsg<'_, T>
202where
203	T: TransportWriteHalf + ?Sized + Unpin,
204{
205	type Output = Result<(), TransportError>;
206
207	fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
208		let header = self.header;
209		let body = self.body;
210		Pin::new(&mut *self.get_mut().inner).poll_write_msg(cx, header, body)
211	}
212}
213
214impl<T> TransportReadHalf for &'_ mut T
215where
216	T: TransportReadHalf + Unpin + ?Sized,
217{
218	type Body = T::Body;
219
220	fn poll_read_msg(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<Message<Self::Body>, TransportError>> {
221		T::poll_read_msg(Pin::new(*self.get_mut()), context)
222	}
223}
224
225impl<T> TransportReadHalf for Box<T>
226where
227	T: TransportReadHalf + Unpin + ?Sized,
228{
229	type Body = T::Body;
230
231	fn poll_read_msg(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<Message<Self::Body>, TransportError>> {
232		T::poll_read_msg(Pin::new(&mut *self.get_mut()), context)
233	}
234}
235
236impl<P> TransportReadHalf for Pin<P>
237where
238	P: std::ops::DerefMut + Send + Unpin,
239	P::Target: TransportReadHalf,
240{
241	type Body = <P::Target as TransportReadHalf>::Body;
242
243	fn poll_read_msg(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<Message<Self::Body>, TransportError>> {
244		P::Target::poll_read_msg(Pin::new(&mut *self.get_mut()), context)
245	}
246}
247
248impl<T> TransportWriteHalf for &'_ mut T
249where
250	T: TransportWriteHalf + Unpin + ?Sized,
251{
252	type Body = T::Body;
253
254	fn poll_write_msg(
255		self: Pin<&mut Self>,
256		context: &mut Context,
257		header: &MessageHeader,
258		body: &Self::Body,
259	) -> Poll<Result<(), TransportError>> {
260		T::poll_write_msg(Pin::new(*self.get_mut()), context, header, body)
261	}
262}
263
264impl<T> TransportWriteHalf for Box<T>
265where
266	T: TransportWriteHalf + Unpin + ?Sized,
267{
268	type Body = T::Body;
269
270	fn poll_write_msg(
271		self: Pin<&mut Self>,
272		context: &mut Context,
273		header: &MessageHeader,
274		body: &Self::Body,
275	) -> Poll<Result<(), TransportError>> {
276		T::poll_write_msg(Pin::new(&mut *self.get_mut()), context, header, body)
277	}
278}
279
280impl<P> TransportWriteHalf for Pin<P>
281where
282	P: std::ops::DerefMut + Send + Unpin,
283	P::Target: TransportWriteHalf,
284{
285	type Body = <P::Target as TransportWriteHalf>::Body;
286
287	fn poll_write_msg(self: Pin<&mut Self>, context: &mut Context, header: &MessageHeader, body: &Self::Body) -> Poll<Result<(), TransportError>> {
288		P::Target::poll_write_msg(Pin::new(&mut *self.get_mut()), context, header, body)
289	}
290}