1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
//! Transport traits and concrete implementations.
//!
//! Transports are responsible for passing raw messages to a remote peer.
//! They are used by the [`Peer`][crate::Peer] struct to implement higher level RPC communication.
//!
//! Specific transports must be enabled with individual feature flags.
//! None of the concrete transport implementations are enabled by default.

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::error::{ReadMessageError, WriteMessageError};
use crate::{Message, MessageHeader};

#[cfg(any(feature = "unix-stream", feature = "tcp"))]
pub(crate) mod stream;

#[cfg(any(feature = "unix-stream", feature = "tcp"))]
pub use stream::StreamTransport;

#[cfg(feature = "unix-seqpacket")]
pub use unix::UnixTransport;

#[cfg(feature = "unix-seqpacket")]
pub(crate) mod unix;

/// Trait for types that represent a bi-direction message transport.
///
/// Note that you can not use the transport itself directly.
/// Instead, you must split it in a read and write half and use those.
pub trait Transport: Send + 'static {
	/// The body type for the messages.
	type Body: crate::Body;

	/// The configuration type for the transport.
	type Config: Clone + Default + Send + Sync + 'static;

	/// The type of the read half of the transport.
	type ReadHalf: for<'a> ReadHalfType<'a, Body = Self::Body>;

	/// The type of the write half of the transport.
	type WriteHalf: for<'a> WriteHalfType<'a, Body = Self::Body>;

	/// Split the transport into a read half and a write half.
	#[allow(clippy::needless_lifetimes)]
	fn split<'a>(&'a mut self) -> (<Self::ReadHalf as ReadHalfType<'a>>::ReadHalf, <Self::WriteHalf as WriteHalfType<'a>>::WriteHalf);
}

// TODO: Replace this with a generic associated type once it hits stable.
/// Helper trait to define the type of a read half for a transport.
///
/// Used to work around the lack of generic associated types.
pub trait ReadHalfType<'a> {
	/// The body type for the transport.
	type Body: crate::Body;

	/// The concrete type of the read half.
	type ReadHalf: TransportReadHalf<Body = Self::Body>;
}

// TODO: Replace this with a generic associated type once it hits stable.
/// Helper trait to define the type of a write half for a transport.
///
/// Used to work around the lack of generic associated types.
pub trait WriteHalfType<'a> {
	/// The body type for the transport.
	type Body: crate::Body;

	/// The concrete type of the write half.
	type WriteHalf: TransportWriteHalf<Body = Self::Body>;
}

/// Trait for the read half of a transport type.
pub trait TransportReadHalf: Send + Unpin {
	/// The body type for messages transferred over the transport.
	type Body: crate::Body;

	/// Try to read a message from the transport without blocking.
	///
	/// This function may read partial messages into an internal buffer.
	///
	/// If the function returns [`Poll::Pending`],
	/// the current task is scheduled to wake when more data is available.
	fn poll_read_msg(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<Message<Self::Body>, ReadMessageError>>;

	/// Asynchronously read a complete message from the transport.
	fn read_msg(&mut self) -> ReadMsg<Self>
	where
		Self: Unpin,
	{
		ReadMsg { inner: self }
	}
}

/// Trait for transport types that you can write message to.
pub trait TransportWriteHalf: Send + Unpin {
	/// The body type for messages transferred over the transport.
	type Body: crate::Body;

	/// Try to write a message to the transport without blocking.
	///
	/// This function may write only part of the message.
	/// The next invocation will skip the already written bytes.
	///
	/// It is an error to change the value of the `header` and `body` parameters between invocations
	/// as long as the function returns [`Poll::Pending`].
	/// An implementation may write spliced messages over the transport if you do.
	/// It is allowed to *move* the header and body in between invocations though,
	/// as long as the values remain the same.
	///
	/// If the function returns [`Poll::Pending`],
	/// the current task is scheduled to wake when the transport is ready for more data.
	fn poll_write_msg(self: Pin<&mut Self>, context: &mut Context, header: &MessageHeader, body: &Self::Body) -> Poll<Result<(), WriteMessageError>>;

	/// Asynchronously write a message to the transport.
	fn write_msg<'c>(&'c mut self, header: &'c MessageHeader, body: &'c Self::Body) -> WriteMsg<Self> {
		WriteMsg { inner: self, header, body }
	}
}

/// Future type for [`TransportReadHalf::read_msg`].
pub struct ReadMsg<'c, T>
where
	T: TransportReadHalf + ?Sized,
{
	inner: &'c mut T,
}

/// Future type for [`TransportWriteHalf::write_msg`].
pub struct WriteMsg<'c, T>
where
	T: TransportWriteHalf + ?Sized,
{
	inner: &'c mut T,
	header: &'c MessageHeader,
	body: &'c T::Body,
}

impl<T> Future for ReadMsg<'_, T>
where
	T: TransportReadHalf + ?Sized + Unpin,
{
	type Output = Result<Message<T::Body>, ReadMessageError>;

	fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
		Pin::new(&mut *self.get_mut().inner).poll_read_msg(cx)
	}
}

impl<T> Future for WriteMsg<'_, T>
where
	T: TransportWriteHalf + ?Sized + Unpin,
{
	type Output = Result<(), WriteMessageError>;

	fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
		let header = self.header;
		let body = self.body;
		Pin::new(&mut *self.get_mut().inner).poll_write_msg(cx, header, body)
	}
}

impl<T> TransportReadHalf for &'_ mut T
where
	T: TransportReadHalf + Unpin + ?Sized,
{
	type Body = T::Body;

	fn poll_read_msg(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<Message<Self::Body>, ReadMessageError>> {
		self.as_mut().poll_read_msg(context)
	}
}

impl<T> TransportReadHalf for Box<T>
where
	T: TransportReadHalf + Unpin + ?Sized,
{
	type Body = T::Body;

	fn poll_read_msg(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<Message<Self::Body>, ReadMessageError>> {
		self.as_mut().poll_read_msg(context)
	}
}

impl<P> TransportReadHalf for Pin<P>
where
	P: std::ops::DerefMut + Send + Unpin,
	P::Target: TransportReadHalf,
{
	type Body = <P::Target as TransportReadHalf>::Body;

	fn poll_read_msg(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<Message<Self::Body>, ReadMessageError>> {
		self.get_mut().as_mut().poll_read_msg(context)
	}
}

impl<T> TransportWriteHalf for &'_ mut T
where
	T: TransportWriteHalf + Unpin + ?Sized,
{
	type Body = T::Body;

	fn poll_write_msg(
		mut self: Pin<&mut Self>,
		context: &mut Context,
		header: &MessageHeader,
		body: &Self::Body,
	) -> Poll<Result<(), WriteMessageError>> {
		self.as_mut().poll_write_msg(context, header, body)
	}
}

impl<T> TransportWriteHalf for Box<T>
where
	T: TransportWriteHalf + Unpin + ?Sized,
{
	type Body = T::Body;

	fn poll_write_msg(
		mut self: Pin<&mut Self>,
		context: &mut Context,
		header: &MessageHeader,
		body: &Self::Body,
	) -> Poll<Result<(), WriteMessageError>> {
		self.as_mut().poll_write_msg(context, header, body)
	}
}

impl<P> TransportWriteHalf for Pin<P>
where
	P: std::ops::DerefMut + Send + Unpin,
	P::Target: TransportWriteHalf,
{
	type Body = <P::Target as TransportWriteHalf>::Body;

	fn poll_write_msg(self: Pin<&mut Self>, context: &mut Context, header: &MessageHeader, body: &Self::Body) -> Poll<Result<(), WriteMessageError>> {
		self.get_mut().as_mut().poll_write_msg(context, header, body)
	}
}