fizyr_rpc/transport/
mod.rs1use std::future::Future;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12
13use crate::{Error, Message, MessageHeader};
14
15mod endian;
16pub use endian::Endian;
17
18pub(crate) mod stream;
19pub use stream::StreamTransport;
20
21#[cfg(feature = "tcp")]
22pub use stream::TcpStreamInfo;
23
24#[cfg(feature = "unix-stream")]
25pub use stream::UnixStreamInfo;
26
27pub(crate) mod unix;
28pub use unix::UnixTransport;
29
30#[cfg(feature = "unix-seqpacket")]
31pub use unix::UnixSeqpacketInfo;
32
33pub trait Transport: Send + 'static {
38 type Body: crate::Body;
40
41 type Info: Clone + Send + 'static;
43
44 type Config: Clone + Default + Send + Sync + 'static;
46
47 type ReadHalf<'a>: TransportReadHalf<Body = Self::Body> + 'a;
49
50 type WriteHalf<'a>: TransportWriteHalf<Body = Self::Body> + 'a;
52
53 fn split(&mut self) -> (Self::ReadHalf<'_>, Self::WriteHalf<'_>);
55
56 fn info(&self) -> std::io::Result<Self::Info>;
61}
62
63#[derive(Debug)]
68pub struct TransportError {
69 inner: Error,
71
72 is_fatal: bool,
74}
75
76impl TransportError {
77 fn new_fatal(inner: impl Into<Error>) -> Self {
81 Self {
82 inner: inner.into(),
83 is_fatal: true,
84 }
85 }
86
87 fn new_non_fatal(inner: impl Into<Error>) -> Self {
91 Self {
92 inner: inner.into(),
93 is_fatal: false,
94 }
95 }
96
97 pub fn inner(&self) -> &Error {
99 &self.inner
100 }
101
102 pub fn into_inner(self) -> Error {
104 self.inner
105 }
106
107 pub fn is_fatal(&self) -> bool {
112 self.is_fatal
113 }
114}
115
116impl std::error::Error for TransportError {}
117
118impl std::fmt::Display for TransportError {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 self.inner.fmt(f)
121 }
122}
123
124pub trait TransportReadHalf: Send + Unpin {
126 type Body: crate::Body;
128
129 fn poll_read_msg(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<Message<Self::Body>, TransportError>>;
136
137 fn read_msg(&mut self) -> ReadMsg<Self>
139 where
140 Self: Unpin,
141 {
142 ReadMsg { inner: self }
143 }
144}
145
146pub trait TransportWriteHalf: Send + Unpin {
148 type Body: crate::Body;
150
151 fn poll_write_msg(self: Pin<&mut Self>, context: &mut Context, header: &MessageHeader, body: &Self::Body) -> Poll<Result<(), TransportError>>;
165
166 fn write_msg<'c>(&'c mut self, header: &'c MessageHeader, body: &'c Self::Body) -> WriteMsg<Self> {
168 WriteMsg { inner: self, header, body }
169 }
170}
171
172pub struct ReadMsg<'c, T>
174where
175 T: TransportReadHalf + ?Sized,
176{
177 inner: &'c mut T,
178}
179
180pub struct WriteMsg<'c, T>
182where
183 T: TransportWriteHalf + ?Sized,
184{
185 inner: &'c mut T,
186 header: &'c MessageHeader,
187 body: &'c T::Body,
188}
189
190impl<T> Future for ReadMsg<'_, T>
191where
192 T: TransportReadHalf + ?Sized + Unpin,
193{
194 type Output = Result<Message<T::Body>, TransportError>;
195
196 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
197 Pin::new(&mut *self.get_mut().inner).poll_read_msg(cx)
198 }
199}
200
201impl<T> Future for WriteMsg<'_, T>
202where
203 T: TransportWriteHalf + ?Sized + Unpin,
204{
205 type Output = Result<(), TransportError>;
206
207 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
208 let header = self.header;
209 let body = self.body;
210 Pin::new(&mut *self.get_mut().inner).poll_write_msg(cx, header, body)
211 }
212}
213
214impl<T> TransportReadHalf for &'_ mut T
215where
216 T: TransportReadHalf + Unpin + ?Sized,
217{
218 type Body = T::Body;
219
220 fn poll_read_msg(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<Message<Self::Body>, TransportError>> {
221 T::poll_read_msg(Pin::new(*self.get_mut()), context)
222 }
223}
224
225impl<T> TransportReadHalf for Box<T>
226where
227 T: TransportReadHalf + Unpin + ?Sized,
228{
229 type Body = T::Body;
230
231 fn poll_read_msg(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<Message<Self::Body>, TransportError>> {
232 T::poll_read_msg(Pin::new(&mut *self.get_mut()), context)
233 }
234}
235
236impl<P> TransportReadHalf for Pin<P>
237where
238 P: std::ops::DerefMut + Send + Unpin,
239 P::Target: TransportReadHalf,
240{
241 type Body = <P::Target as TransportReadHalf>::Body;
242
243 fn poll_read_msg(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<Message<Self::Body>, TransportError>> {
244 P::Target::poll_read_msg(Pin::new(&mut *self.get_mut()), context)
245 }
246}
247
248impl<T> TransportWriteHalf for &'_ mut T
249where
250 T: TransportWriteHalf + Unpin + ?Sized,
251{
252 type Body = T::Body;
253
254 fn poll_write_msg(
255 self: Pin<&mut Self>,
256 context: &mut Context,
257 header: &MessageHeader,
258 body: &Self::Body,
259 ) -> Poll<Result<(), TransportError>> {
260 T::poll_write_msg(Pin::new(*self.get_mut()), context, header, body)
261 }
262}
263
264impl<T> TransportWriteHalf for Box<T>
265where
266 T: TransportWriteHalf + Unpin + ?Sized,
267{
268 type Body = T::Body;
269
270 fn poll_write_msg(
271 self: Pin<&mut Self>,
272 context: &mut Context,
273 header: &MessageHeader,
274 body: &Self::Body,
275 ) -> Poll<Result<(), TransportError>> {
276 T::poll_write_msg(Pin::new(&mut *self.get_mut()), context, header, body)
277 }
278}
279
280impl<P> TransportWriteHalf for Pin<P>
281where
282 P: std::ops::DerefMut + Send + Unpin,
283 P::Target: TransportWriteHalf,
284{
285 type Body = <P::Target as TransportWriteHalf>::Body;
286
287 fn poll_write_msg(self: Pin<&mut Self>, context: &mut Context, header: &MessageHeader, body: &Self::Body) -> Poll<Result<(), TransportError>> {
288 P::Target::poll_write_msg(Pin::new(&mut *self.get_mut()), context, header, body)
289 }
290}