1#![deny(clippy::correctness)]
4#![warn(
5 clippy::perf,
6 clippy::complexity,
7 clippy::style,
8 clippy::nursery,
9 clippy::pedantic,
10 clippy::clone_on_ref_ptr,
11 clippy::decimal_literal_representation,
12 clippy::float_cmp_const,
13 clippy::missing_docs_in_private_items,
14 clippy::multiple_inherent_impl,
15 clippy::unwrap_used,
16 clippy::cargo_common_metadata,
17 clippy::used_underscore_binding
18)]
19
20mod queue;
21mod read_pipe;
22mod write_pipe;
23use crate::queue::Queue;
24use crate::read_pipe::ReadPipe;
25use crate::write_pipe::WritePipe;
26use rustls::{ConnectionCommon, StreamOwned};
27use std::fmt::{Arguments, Debug};
28use std::io::{ErrorKind, Read, Write};
29use std::ops::{Deref, DerefMut};
30use std::sync::atomic::AtomicBool;
31use std::sync::atomic::Ordering::SeqCst;
32use std::sync::{Arc, LockResult, Mutex};
33use std::time::Duration;
34use std::{io, thread};
35
36#[derive(Debug)]
37pub struct RustTlsDuplexStream<C, S>
38where
39 C: DerefMut + Deref<Target = ConnectionCommon<S>> + Send,
40{
41 non_blocking_read: AtomicBool,
43 read_timeout: Mutex<Option<Duration>>,
45 write_timeout: Mutex<Option<Duration>>,
47 connection: Mutex<StreamOwned<C, CombinedPipe>>,
49 read_q: Arc<Queue>,
51 write_q: Arc<Queue>,
53 write_mutex: Mutex<()>,
55 read_mutex: Mutex<()>,
57}
58
59impl<C, S> RustTlsDuplexStream<C, S>
60where
61 C: DerefMut + Deref<Target = ConnectionCommon<S>> + Send,
62 S: rustls::SideData,
63{
64 pub fn new_unpooled<R, W>(con: C, read: R, write: W) -> io::Result<Self>
82 where
83 R: Read + Send + 'static,
84 W: Write + Send + 'static,
85 {
86 Self::new(con, read, write, |task| {
87 thread::Builder::new().spawn(task).map(|_| {})
88 })
89 }
90
91 pub fn new<R, W, T>(con: C, read: R, write: W, spawner: T) -> io::Result<Self>
108 where
109 R: Read + Send + 'static,
110 W: Write + Send + 'static,
111 T: FnMut(Box<dyn FnOnce() + Send>) -> io::Result<()>,
112 {
113 let pipe = CombinedPipe::new(read, write, spawner)?;
114 let read_q = pipe.0.dup_queue();
115 let write_q = pipe.1.dup_queue();
116
117 Ok(Self {
118 non_blocking_read: AtomicBool::new(false),
119 read_q,
120 write_q,
121 write_mutex: Mutex::new(()),
122 read_mutex: Mutex::new(()),
123 connection: Mutex::new(StreamOwned::new(con, pipe)),
124 read_timeout: Mutex::new(None),
125 write_timeout: Mutex::new(None),
126 })
127 }
128
129 pub fn write(&self, buffer: &[u8]) -> io::Result<usize> {
133 let _outer_guard = unwrap_poison(self.write_mutex.lock())?; let timeout_copy = unwrap_poison(self.write_timeout.lock())?
135 .deref()
136 .as_ref()
137 .copied();
138 self.write_q.flush_low(timeout_copy)?;
139 unwrap_poison(self.connection.lock())?.write(buffer)
140 }
141
142 pub fn flush(&self) -> io::Result<()> {
146 let _outer_guard = unwrap_poison(self.write_mutex.lock())?; unwrap_poison(self.connection.lock())?.flush()?;
148 self.write_q.flush_zero()
149 }
150
151 pub fn read(&self, buffer: &mut [u8]) -> io::Result<usize> {
155 let _outer_guard = unwrap_poison(self.read_mutex.lock())?; loop {
157 let mut guard = unwrap_poison(self.connection.lock())?;
158 guard.sock.0.nb(true); let res = guard.read(buffer);
160 guard.sock.0.nb(false); return match res {
162 Ok(count) => {
163 drop(guard);
164 Ok(count)
165 }
166 Err(err) => {
167 if self.non_blocking_read.load(SeqCst) {
168 return Err(err);
169 }
170 if err.kind() == ErrorKind::WouldBlock {
171 let timeout_copy = unwrap_poison(self.read_timeout.lock())?
173 .deref()
174 .as_ref()
175 .copied();
176 self.read_q.await_pop(guard, timeout_copy)?;
178 continue;
179 }
180
181 drop(guard);
182 Err(err)
183 }
184 };
185 }
186 }
187
188 pub fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
197 *unwrap_poison(self.read_timeout.lock())? = timeout;
198 Ok(())
199 }
200
201 pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
205 Ok(unwrap_poison(self.read_timeout.lock())?.as_ref().copied())
206 }
207
208 pub fn set_read_non_block(&self, on: bool) -> io::Result<()> {
214 self.non_blocking_read.store(on, SeqCst);
215 Ok(())
216 }
217
218 pub fn set_write_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
226 *unwrap_poison(self.write_timeout.lock())? = timeout;
227 Ok(())
228 }
229
230 pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
234 Ok(unwrap_poison(self.write_timeout.lock())?.as_ref().copied())
235 }
236
237 pub fn read_to_end(&self, buf: &mut Vec<u8>) -> io::Result<usize> {
241 Read::read_to_end(&mut &*self, buf)
242 }
243
244 pub fn read_to_string(&self, buf: &mut String) -> io::Result<usize> {
248 Read::read_to_string(&mut &*self, buf)
249 }
250
251 pub fn read_exact(&self, buf: &mut [u8]) -> io::Result<()> {
255 Read::read_exact(&mut &*self, buf)
256 }
257
258 pub fn write_all(&self, buf: &[u8]) -> io::Result<()> {
262 Write::write_all(&mut &*self, buf)
263 }
264
265 pub fn write_fmt(&self, fmt: Arguments<'_>) -> io::Result<()> {
269 Write::write_fmt(&mut &*self, fmt)
270 }
271}
272
273impl<C, S> Read for RustTlsDuplexStream<C, S>
274where
275 C: DerefMut + Deref<Target = ConnectionCommon<S>> + Send,
276 S: rustls::SideData,
277{
278 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
279 Self::read(self, buf)
280 }
281}
282
283impl<C, S> Read for &RustTlsDuplexStream<C, S>
284where
285 C: DerefMut + Deref<Target = ConnectionCommon<S>> + Send,
286 S: rustls::SideData,
287{
288 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
289 RustTlsDuplexStream::<C, S>::read(self, buf)
290 }
291}
292
293impl<C, S> Write for RustTlsDuplexStream<C, S>
294where
295 C: DerefMut + Deref<Target = ConnectionCommon<S>> + Send,
296 S: rustls::SideData,
297{
298 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
299 Self::write(self, buf)
300 }
301
302 fn flush(&mut self) -> io::Result<()> {
303 Self::flush(self)
304 }
305}
306
307impl<C, S> Write for &RustTlsDuplexStream<C, S>
308where
309 C: DerefMut + Deref<Target = ConnectionCommon<S>> + Send,
310 S: rustls::SideData,
311{
312 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
313 RustTlsDuplexStream::<C, S>::write(self, buf)
314 }
315
316 fn flush(&mut self) -> io::Result<()> {
317 RustTlsDuplexStream::<C, S>::flush(self)
318 }
319}
320
321#[derive(Debug)]
324struct CombinedPipe(ReadPipe, WritePipe);
325
326impl CombinedPipe {
327 pub fn new<
329 R: Read + Send + 'static,
330 W: Write + Send + 'static,
331 T: FnMut(Box<dyn FnOnce() + Send>) -> io::Result<()>,
332 >(
333 read: R,
334 write: W,
335 mut spawner: T,
336 ) -> io::Result<Self> {
337 Ok(Self(
338 ReadPipe::new(read, &mut spawner)?,
339 WritePipe::new(write, &mut spawner)?,
340 ))
341 }
342}
343
344impl Read for CombinedPipe {
345 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
346 self.0.read(buf)
347 }
348}
349
350impl Write for CombinedPipe {
351 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
352 self.1.write(buf)
353 }
354
355 fn flush(&mut self) -> io::Result<()> {
356 self.1.flush()
357 }
358}
359
360pub(crate) fn unwrap_poison<T>(result: LockResult<T>) -> io::Result<T> {
362 result.map_err(|_| io::Error::new(ErrorKind::Other, "Poisoned Mutex"))
363}