ironrdp_tokio/
lib.rs

1#![cfg_attr(doc, doc = include_str!("../README.md"))]
2#![doc(html_logo_url = "https://cdnweb.devolutions.net/images/projects/devolutions/logos/devolutions-icon-shadow.svg")]
3
4#[rustfmt::skip] // do not re-order this pub use
5pub use ironrdp_async::*;
6
7#[cfg(feature = "reqwest")]
8pub mod reqwest;
9
10use core::pin::Pin;
11use std::io;
12
13use bytes::BytesMut;
14use tokio::io::{AsyncRead, AsyncWrite, ReadHalf, WriteHalf};
15
16pub type TokioFramed<S> = Framed<TokioStream<S>>;
17
18pub fn split_tokio_framed<S>(framed: TokioFramed<S>) -> (TokioFramed<ReadHalf<S>>, TokioFramed<WriteHalf<S>>)
19where
20    S: Sync + Unpin + AsyncRead + AsyncWrite,
21{
22    let (stream, leftover) = framed.into_inner();
23    let (read_half, write_half) = tokio::io::split(stream);
24    let framed_read = TokioFramed::new_with_leftover(read_half, leftover);
25    let framed_write = TokioFramed::new(write_half);
26    (framed_read, framed_write)
27}
28
29pub fn unsplit_tokio_framed<S>(reader: TokioFramed<ReadHalf<S>>, writer: TokioFramed<WriteHalf<S>>) -> TokioFramed<S>
30where
31    S: Sync + Unpin + AsyncRead + AsyncWrite,
32{
33    let (reader, leftover) = reader.into_inner();
34    let writer = writer.into_inner_no_leftover();
35    TokioFramed::new_with_leftover(reader.unsplit(writer), leftover)
36}
37
38pub struct TokioStream<S> {
39    inner: S,
40}
41
42impl<S> StreamWrapper for TokioStream<S> {
43    type InnerStream = S;
44
45    fn from_inner(stream: Self::InnerStream) -> Self {
46        Self { inner: stream }
47    }
48
49    fn into_inner(self) -> Self::InnerStream {
50        self.inner
51    }
52
53    fn get_inner(&self) -> &Self::InnerStream {
54        &self.inner
55    }
56
57    fn get_inner_mut(&mut self) -> &mut Self::InnerStream {
58        &mut self.inner
59    }
60}
61
62impl<S> FramedRead for TokioStream<S>
63where
64    S: Send + Sync + Unpin + AsyncRead,
65{
66    type ReadFut<'read>
67        = Pin<Box<dyn core::future::Future<Output = io::Result<usize>> + Send + Sync + 'read>>
68    where
69        Self: 'read;
70
71    fn read<'a>(&'a mut self, buf: &'a mut BytesMut) -> Self::ReadFut<'a> {
72        use tokio::io::AsyncReadExt as _;
73
74        Box::pin(async { self.inner.read_buf(buf).await })
75    }
76}
77
78impl<S> FramedWrite for TokioStream<S>
79where
80    S: Send + Sync + Unpin + AsyncWrite,
81{
82    type WriteAllFut<'write>
83        = Pin<Box<dyn core::future::Future<Output = io::Result<()>> + Send + Sync + 'write>>
84    where
85        Self: 'write;
86
87    fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteAllFut<'a> {
88        use tokio::io::AsyncWriteExt as _;
89
90        Box::pin(async {
91            self.inner.write_all(buf).await?;
92            self.inner.flush().await?;
93
94            Ok(())
95        })
96    }
97}
98
99pub type LocalTokioFramed<S> = Framed<LocalTokioStream<S>>;
100
101pub struct LocalTokioStream<S> {
102    inner: S,
103}
104
105impl<S> StreamWrapper for LocalTokioStream<S> {
106    type InnerStream = S;
107
108    fn from_inner(stream: Self::InnerStream) -> Self {
109        Self { inner: stream }
110    }
111
112    fn into_inner(self) -> Self::InnerStream {
113        self.inner
114    }
115
116    fn get_inner(&self) -> &Self::InnerStream {
117        &self.inner
118    }
119
120    fn get_inner_mut(&mut self) -> &mut Self::InnerStream {
121        &mut self.inner
122    }
123}
124
125impl<S> FramedRead for LocalTokioStream<S>
126where
127    S: Unpin + AsyncRead,
128{
129    type ReadFut<'read>
130        = Pin<Box<dyn core::future::Future<Output = io::Result<usize>> + 'read>>
131    where
132        Self: 'read;
133
134    fn read<'a>(&'a mut self, buf: &'a mut BytesMut) -> Self::ReadFut<'a> {
135        use tokio::io::AsyncReadExt as _;
136
137        Box::pin(async { self.inner.read_buf(buf).await })
138    }
139}
140
141impl<S> FramedWrite for LocalTokioStream<S>
142where
143    S: Unpin + AsyncWrite,
144{
145    type WriteAllFut<'write>
146        = Pin<Box<dyn core::future::Future<Output = io::Result<()>> + 'write>>
147    where
148        Self: 'write;
149
150    fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteAllFut<'a> {
151        use tokio::io::AsyncWriteExt as _;
152
153        Box::pin(async {
154            self.inner.write_all(buf).await?;
155            self.inner.flush().await?;
156
157            Ok(())
158        })
159    }
160}
161
162pub type MovableTokioFramed<S> = Framed<MovableTokioStream<S>>;
163
164pub struct MovableTokioStream<S: Send> {
165    inner: S,
166}
167
168impl<S: Send> StreamWrapper for MovableTokioStream<S> {
169    type InnerStream = S;
170
171    fn from_inner(stream: Self::InnerStream) -> Self {
172        Self { inner: stream }
173    }
174
175    fn into_inner(self) -> Self::InnerStream {
176        self.inner
177    }
178
179    fn get_inner(&self) -> &Self::InnerStream {
180        &self.inner
181    }
182
183    fn get_inner_mut(&mut self) -> &mut Self::InnerStream {
184        &mut self.inner
185    }
186}
187
188impl<S> FramedRead for MovableTokioStream<S>
189where
190    S: Send + Unpin + AsyncRead,
191{
192    type ReadFut<'read>
193        = Pin<Box<dyn core::future::Future<Output = io::Result<usize>> + Send + 'read>>
194    where
195        Self: 'read;
196
197    fn read<'a>(&'a mut self, buf: &'a mut BytesMut) -> Self::ReadFut<'a> {
198        use tokio::io::AsyncReadExt as _;
199
200        Box::pin(async { self.inner.read_buf(buf).await })
201    }
202}
203
204impl<S> FramedWrite for MovableTokioStream<S>
205where
206    S: Send + Unpin + AsyncWrite,
207{
208    type WriteAllFut<'write>
209        = Pin<Box<dyn core::future::Future<Output = io::Result<()>> + Send + 'write>>
210    where
211        Self: 'write;
212
213    fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteAllFut<'a> {
214        use tokio::io::AsyncWriteExt as _;
215
216        Box::pin(async {
217            self.inner.write_all(buf).await?;
218            self.inner.flush().await?;
219
220            Ok(())
221        })
222    }
223}