ironrdp_futures/
lib.rs

1#![cfg_attr(doc, doc = include_str!("../README.md"))]
2#![doc(html_logo_url = "https://cdnweb.devolutions.net/images/projects/devolutions/logos/devolutions-icon-shadow.svg")]
3
4#[rustfmt::skip] // do not re-order this pub use
5pub use ironrdp_async::*;
6
7use core::pin::Pin;
8use std::io;
9
10use bytes::BytesMut;
11use futures_util::io::{AsyncRead, AsyncWrite};
12
13pub type FuturesFramed<S> = Framed<FuturesStream<S>>;
14
15pub struct FuturesStream<S> {
16    inner: S,
17}
18
19impl<S> StreamWrapper for FuturesStream<S> {
20    type InnerStream = S;
21
22    fn from_inner(stream: Self::InnerStream) -> Self {
23        Self { inner: stream }
24    }
25
26    fn into_inner(self) -> Self::InnerStream {
27        self.inner
28    }
29
30    fn get_inner(&self) -> &Self::InnerStream {
31        &self.inner
32    }
33
34    fn get_inner_mut(&mut self) -> &mut Self::InnerStream {
35        &mut self.inner
36    }
37}
38
39impl<S> FramedRead for FuturesStream<S>
40where
41    S: Send + Sync + Unpin + AsyncRead,
42{
43    type ReadFut<'read>
44        = Pin<Box<dyn core::future::Future<Output = io::Result<usize>> + Send + Sync + 'read>>
45    where
46        Self: 'read;
47
48    fn read<'a>(&'a mut self, buf: &'a mut BytesMut) -> Self::ReadFut<'a> {
49        use futures_util::io::AsyncReadExt as _;
50
51        Box::pin(async {
52            // NOTE(perf): tokio implementation is more efficient
53            let mut read_bytes = [0u8; 1024];
54            let len = self.inner.read(&mut read_bytes).await?;
55            buf.extend_from_slice(&read_bytes[..len]);
56
57            Ok(len)
58        })
59    }
60}
61
62impl<S> FramedWrite for FuturesStream<S>
63where
64    S: Send + Sync + Unpin + AsyncWrite,
65{
66    type WriteAllFut<'write>
67        = Pin<Box<dyn core::future::Future<Output = io::Result<()>> + Send + Sync + 'write>>
68    where
69        Self: 'write;
70
71    fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteAllFut<'a> {
72        use futures_util::io::AsyncWriteExt as _;
73
74        Box::pin(async {
75            self.inner.write_all(buf).await?;
76            self.inner.flush().await?;
77
78            Ok(())
79        })
80    }
81}
82
83pub type LocalFuturesFramed<S> = Framed<LocalFuturesStream<S>>;
84
85pub struct LocalFuturesStream<S> {
86    inner: S,
87}
88
89impl<S> StreamWrapper for LocalFuturesStream<S> {
90    type InnerStream = S;
91
92    fn from_inner(stream: Self::InnerStream) -> Self {
93        Self { inner: stream }
94    }
95
96    fn into_inner(self) -> Self::InnerStream {
97        self.inner
98    }
99
100    fn get_inner(&self) -> &Self::InnerStream {
101        &self.inner
102    }
103
104    fn get_inner_mut(&mut self) -> &mut Self::InnerStream {
105        &mut self.inner
106    }
107}
108
109impl<S> FramedRead for LocalFuturesStream<S>
110where
111    S: Unpin + AsyncRead,
112{
113    type ReadFut<'read>
114        = Pin<Box<dyn core::future::Future<Output = io::Result<usize>> + 'read>>
115    where
116        Self: 'read;
117
118    fn read<'a>(&'a mut self, buf: &'a mut BytesMut) -> Self::ReadFut<'a> {
119        use futures_util::io::AsyncReadExt as _;
120
121        Box::pin(async {
122            // NOTE(perf): tokio implementation is more efficient
123            let mut read_bytes = [0u8; 1024];
124            let len = self.inner.read(&mut read_bytes[..]).await?;
125            buf.extend_from_slice(&read_bytes[..len]);
126
127            Ok(len)
128        })
129    }
130}
131
132impl<S> FramedWrite for LocalFuturesStream<S>
133where
134    S: Unpin + AsyncWrite,
135{
136    type WriteAllFut<'write>
137        = Pin<Box<dyn core::future::Future<Output = io::Result<()>> + 'write>>
138    where
139        Self: 'write;
140
141    fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteAllFut<'a> {
142        use futures_util::io::AsyncWriteExt as _;
143
144        Box::pin(async {
145            self.inner.write_all(buf).await?;
146            self.inner.flush().await?;
147
148            Ok(())
149        })
150    }
151}