rtv/
client.rs

1
2//! This module contains an HTTP [`Client`] that runs ontop of `mio`.
3
4use mio::net::TcpStream;
5use chunked_transfer::Decoder as ChunkedDecoder;
6use std::{io::{self, Write, Read}, time::{Duration, Instant}, collections::HashMap, net::{SocketAddr, Ipv4Addr}, mem::replace};
7use crate::{dns, util::{make_socket_addr, notconnected, register_all, wouldblock, hash}, ResponseHead, ReqId, Response, ResponseState, Mode, Status, OwnedHeader, RawRequest};
8
9#[cfg(feature = "tls")]
10use std::sync::Arc;
11
12/// A flexible HTTP client.
13///
14/// Use the client if you wanna have controll over `mio` yourself.
15/// You should look at the documentation of the individual methods for more info on
16/// what exactly they do.
17///
18/// In general, you pass the client a handle to your `Poll` when you send a request.
19/// Inside you `mio` event loop, when you get an event, you then call the [`Client::pump`] function,
20/// which drives the request to completion.
21///
22/// # Example
23///
24/// This is more or less a full blown example on what it takes to correctly
25/// send a request.
26///
27/// ```rust
28///
29/// let io = mio::Poll::new()?;
30/// let mut client = rtv::Client::new(mio::Token(0));
31///
32/// let request = Request::get().host("example.com").https();
33/// let _id = client.send(&io, mio::Token(2), request)?;
34/// //  ^^^ the returned id can be used to check which response belongs to which request
35/// //      although we are just sending one request here so this isn't needed
36///
37/// // we have to store the body ourselfes
38/// let mut response_body = Vec::new();
39///
40/// 'ev: loop {
41///     
42///     // see note below on how to handle timeouts
43///     io.poll(&mut events, client.timeout())?;
44///     
45///     // loop over all the responses we may have gotten
46///     // you don't need to handle events generated by rtv in any other way
47///     for resp in client.pump(&io, &events)? {
48///         match resp.state {
49///             rtv::ResponseState::Head(head) => {
50///                 // the head contains headers etc.
51///                 pritnln!("Content-Length: {}", head.content_length);
52///                 pritnln!("Some header: {}", head.headers[0]);
53///             },
54///             rtv::ResponseState::Data(some_data) => {
55///                 // you will receive data in small chunks as it comes in
56///                 response_body.extend(some_data);
57///             },
58///             rtv::ResponseState::Done => {
59///                 break 'ev;
60///             },
61///             // maybe a timeout or I/O error
62///             other => panic!("Error: {}", other),
63///         };
64///     };
65///     
66///     events.clear();
67///
68/// }
69///
70/// let body_str = str::from_utf8(&response_body)?;
71/// println!("{}", body_str);
72///
73/// ```
74/// 
75/// # Timeouts
76///
77/// Rtv supports a timeout for every individual request. It will even be
78/// applied to dns resolution.
79///
80/// You have to specify this timeout in two places. First, when creating your
81/// `Request` and then once again when waiting for events with `mio`.
82///
83/// The timeout used with `mio` always has to match the smallest time left
84/// for any request currently in progress, so that the `Client` can terminate
85/// the request if the timeout is reached.
86///
87/// You could do this manually but you should probably use [`Client::timeout`]
88/// which does the calculation for you.
89pub struct Client {
90    dns: dns::DnsClient,
91    dns_cache: HashMap<u64, CachedAddr>,
92    requests: Vec<InternalReq>,
93    next_id: usize,
94    #[cfg(feature = "tls")]
95    tls_config: Arc<rustls::ClientConfig>,
96    #[cfg(not(feature = "tls"))]
97    tls_config: (),
98}
99
100impl Client {
101
102    /// Creates a new client.
103    ///
104    /// The token you pass in will be used for dns resolution as
105    /// this requires (only) one socket.
106    #[inline(always)]
107    pub fn new(token: mio::Token) -> Self {
108
109        let tls_config = Self::default_tls_config();
110        
111        Self {
112            dns: dns::DnsClient::new(token),
113            dns_cache: HashMap::new(),
114            requests: Vec::new(),
115            next_id: 0,
116            tls_config,
117        }
118
119    }
120
121    /// Creates a new client with a custom [`ClientConfig`](rustls::ClientConfig).
122    ///
123    /// The token you pass in will be used for dns resolution as
124    /// this requires (only) one socket.
125    #[cfg(feature = "tls")]
126    #[inline(always)]
127    pub fn with_tls_config(token: mio::Token, tls_config: Arc<rustls::ClientConfig>) -> Self {
128        
129        Self {
130            dns: dns::DnsClient::new(token),
131            dns_cache: HashMap::new(),
132            requests: Vec::new(),
133            next_id: 0,
134            tls_config,
135        }
136
137    }
138
139    /// Send a request.
140    ///
141    /// The token you pass in will be used for this request's TCP connection.
142    /// It will be available again once the request completed.
143    ///
144    /// This function will return a [`ReqId`] that can be used to check which response
145    /// belongs to which request later.
146    ///
147    /// For more information on how to create a request see [`Request`] and [`RequestBuilder`](crate::RequestBuilder).
148    /// If you wanna set a timeout, you can do so when creating a request.
149    /// This function can take anything that implements `Into<Request>` so you can pass it a
150    /// `Request` or a `RequestBuilder`, both will work.
151    /// 
152    /// # Example
153    ///
154    /// ```rust
155    /// let request = Request::get().host("example.com");
156    /// client.send(&io, mio::Token(1), request)?; // io is the mio::Poll
157    /// ```
158    pub fn send(&mut self, io: &mio::Poll, token: mio::Token, input: impl Into<RawRequest>) -> io::Result<ReqId> {
159
160        let request = input.into();
161
162        let id = self.next_id;
163        self.next_id = self.next_id.wrapping_add(1);
164
165        let mode = InternalMode::from_mode(request.mode, &self.tls_config, request.host());
166
167        let maybe_cached = self.dns_cache.get(&hash(request.host()));
168        let state = match maybe_cached {
169
170            Some(cached_addr) if !cached_addr.is_outdated() => {
171
172                let mut connection = Connection::new(cached_addr.ip_addr, mode)?;
173                register_all(io, &mut connection, token)?;
174                InternalReqState::Sending {
175                    body: request.bytes,
176                    connection,
177                }
178
179            },
180
181            _not_cached_or_old => {
182
183                let dns_id = self.dns.resolve(io, request.host(), request.timeout)?;
184                InternalReqState::Resolving {
185                    host: hash(request.host()),
186                    body: request.bytes,
187                    dns_id,
188                    mode
189                }
190
191            },
192
193        };
194
195        let internal_req = InternalReq {
196            id,
197            token,
198            state,
199            time_created: Instant::now(),
200            timeout: request.timeout,
201        };
202
203        self.requests.push(internal_req);
204
205        Ok(ReqId { inner: id })
206
207    }
208
209    /// Drive all sent requests to completion and get the responses.
210    ///
211    /// The `pump` function must be executed everytime an event is generated which
212    /// belongs to this `Client`. You don't need to match against the event token
213    /// yourself though as this is done internally.
214    /// All events not belonging to this `Client` will be ignored.
215    ///
216    /// This function will return a `Vec` of responses, that contain the [`ReqId`] of
217    /// the request that belongs to the response.
218    /// The returned `Vec` may be empty, for example if the event belonged to dns resolution.
219    ///
220    /// In general a request will go through following stages:
221    /// 1. Dns resolution, which will generate one or more events.
222    /// 2. Receiving the head, with information about the response such as the content length.
223    ///    ([`ResponseState::Head`])
224    /// 3. Receiving the body, which will generate multiple events.
225    ///    ([`ResponseState::Data`])
226    /// 4. In the end either [`ResponseState::Done`] or [`ResponseState::Error`].
227    ///
228    /// # Example
229    ///
230    /// ```rust
231    /// let events = ...; // wait for new events using mio
232    /// let resps = client.pump(&io, &events)?;
233    /// if resps.is_empty() { println!("Got an event but no response yet!") }
234    /// for resp in resps {
235    ///     println!("Got a response: {:?}", resp.state);
236    /// }
237    /// ```
238    ///
239    /// # Note
240    ///
241    /// The maximum response header count is currently 4096, but this will be
242    /// user-controllable in the future.
243    pub fn pump(&mut self, io: &mio::Poll, events: &mio::Events) -> io::Result<Vec<Response>> {
244
245        let mut responses = Vec::new();
246
247        let dns_resps = self.dns.pump(&io, events)?;
248
249        'rq: for request in self.requests.iter_mut() {
250
251            // finish timed out requests
252            if request.timeout.unwrap_or(Duration::MAX) <= request.time_created.elapsed() {
253
254                responses.push(Response::new(request.id, ResponseState::TimedOut));
255                request.deregister(&io)?; // todo: make io errors not "hard errors" but make them
256                // also be per-request and make it so that you can retry completing the request
257                // after an io error (maybe?)
258                request.finish_error();
259
260            } else {
261
262                if let Some(connection) = request.state.connection_mut() {
263                    // we need to "pump" rustls so it can do the handshake etc.
264                    connection.complete_io()?;
265                }
266
267                for event in events.iter() {
268
269                    match &mut request.state {
270
271                        InternalReqState::Resolving { dns_id, .. } => {
272
273                            for resp in dns_resps.iter() {
274
275                                if &resp.id == dns_id {
276
277                                    // dispatch the result
278                                    // we don't need to call deregister on error since
279                                    // we haven't registered anything yet
280                                    let (addr, ttl) = match resp.outcome {
281                                        dns::DnsOutcome::Known { addr, ttl } => (addr, ttl),
282                                        dns::DnsOutcome::Unknown => {
283                                            responses.push(Response::new(request.id, ResponseState::UnknownHost));
284                                            request.finish_error();
285                                            continue 'rq;
286                                        },
287                                        dns::DnsOutcome::ProtocolError => {
288                                            responses.push(Response::new(request.id, ResponseState::ProtocolError));
289                                            request.finish_error();
290                                            continue 'rq;
291                                        },
292                                        dns::DnsOutcome::TimedOut => {
293                                            responses.push(Response::new(request.id, ResponseState::TimedOut));
294                                            request.finish_error();
295                                            continue 'rq;
296                                        },
297                                    };
298
299                                    let state = replace(&mut request.state, InternalReqState::Unspecified);
300                                    if let InternalReqState::Resolving { body, host, mode, .. } = state {
301
302                                        self.dns_cache.insert(host, CachedAddr {
303                                            ip_addr: addr,
304                                            time_created: Instant::now(),
305                                            ttl,
306                                        });
307
308                                        let mut connection = Connection::new(addr, mode)?;
309                                        register_all(io, &mut connection, request.token)?;
310
311                                        request.state = InternalReqState::Sending { body, connection };
312
313                                        continue 'rq;
314
315                                    } else {
316                                        unreachable!()
317                                    }
318
319                                }
320                                
321                            }
322
323                        },
324
325                        InternalReqState::Sending { body, connection } => {
326
327                            if event.token() == request.token {
328
329                                match connection.peer_addr() {
330                                    Ok(..) => {
331
332                                        match connection.write(&body) {
333                                            Ok(..) => (),
334                                            // during tls handshake it blocks (since the stream is still in rustls's controll)
335                                            Err(err) if wouldblock(&err) => continue 'rq,
336                                            Err(other) => return Err(other),
337                                        };
338
339                                        let state = replace(&mut request.state, InternalReqState::Unspecified);
340                                        if let InternalReqState::Sending { connection, .. } = state {
341
342                                            request.state = InternalReqState::RecvHead {
343                                                connection,
344                                                buffer: Vec::with_capacity(1024),
345                                            };
346
347                                        } else {
348                                            unreachable!()
349                                        }
350
351                                    },
352                                    Err(err) if notconnected(&err) => continue 'rq,
353                                    Err(other) => return Err(other),
354                                }
355
356                            }
357
358                        },
359
360                        // this is handeled in this kinda scuffed way to avoid some code duplication
361                        // after succesfully reading the `Head` the state is updated to `RecvBody`
362                        // which causes both the code for `RecvHead` and `RecvBody` to run
363                        InternalReqState::RecvHead { .. } |
364                        InternalReqState::RecvBody { .. } => {
365
366                            if event.token() == request.token {
367
368                                // we will get another `writable` event after sending the payload
369                                // so we have to check here that this is actually a `readable` event
370                                if event.is_readable() {
371
372                                    if let InternalReqState::RecvHead { connection, buffer } = &mut request.state {
373
374                                        let mut bytes_read = buffer.len();
375                                        let mut closed = false; 
376
377                                        loop {
378
379                                            buffer.resize(bytes_read + 2048, 0u8);
380                                            bytes_read += match connection.read(&mut buffer[bytes_read..]) {
381                                                Ok(0) => { closed = true; break },
382                                                Ok(num) => num,
383                                                Err(err) if wouldblock(&err) => break,
384                                                Err(other) => return Err(other),
385                                            };
386
387                                        }
388
389                                        buffer.truncate(bytes_read);
390
391                                        let mut headers = [httparse::EMPTY_HEADER; 4096]; // todo: make the max header count be controllable by the user
392                                        let mut head = httparse::Response::new(&mut headers);
393                                        let status = match head.parse(&buffer) {
394                                            Ok(val) => val,
395                                            Err(_err) => {
396                                                responses.push(Response::new(request.id, ResponseState::ProtocolError));
397                                                request.finish_error();
398                                                continue 'rq;
399                                            }
400                                        };
401
402                                        if let httparse::Status::Complete(body_start) = status {
403
404                                            let content_length = head.headers.iter()
405                                                .find(|header| header.name == "Content-Length")
406                                                .map(|header| usize::from_str_radix(std::str::from_utf8(header.value)
407                                                    .expect("Content-Length was invalid utf8"), 10)
408                                                    .expect("Content-Length was not a number"))
409                                                .unwrap_or_default();
410
411                                            let transfer_chunked = head.headers.iter()
412                                                .find(|header| header.name == "Transfer-Encoding" && header.value == b"chunked")
413                                                .is_some();
414
415                                            responses.push(Response {
416                                                id: ReqId { inner: request.id },
417                                                state: ResponseState::Head(ResponseHead {
418                                                    status: Status {
419                                                        code: head.code.expect("missing status code"),
420                                                        reason: head.reason.expect("missing reason").to_string(),
421                                                    },
422                                                    content_length,
423                                                    transfer_chunked,
424                                                    headers: head.headers.iter().map(OwnedHeader::from).collect(),
425                                                })
426                                            });
427
428                                            // remove the parsed head from the buffer
429                                            buffer.drain(..body_start);
430
431                                            let state = replace(&mut request.state, InternalReqState::Unspecified);
432                                            if let InternalReqState::RecvHead { connection, buffer } = state {
433
434                                                let chain = io::Cursor::new(buffer).chain(connection);
435                                                let recv = if transfer_chunked {
436                                                    RecvBody::Chunked(ChunkedDecoder::new(chain))
437                                                } else {
438                                                    RecvBody::Plain(chain)
439                                                };
440
441                                                request.state = InternalReqState::RecvBody {
442                                                    recv,
443                                                    bytes_read_total: 0,
444                                                    content_length
445                                                };
446
447                                                // fall through to RecvBody
448
449                                            } else {
450                                                unreachable!()
451                                            }
452
453                                        } else if closed {
454                                            responses.push(Response::new(request.id, ResponseState::Aborted));
455                                            request.finish_error();
456                                            continue 'rq;
457                                        }
458
459                                    }
460
461                                }
462
463                                if let InternalReqState::RecvBody { recv, bytes_read_total, content_length } = &mut request.state {
464
465                                    let mut data = Vec::new();
466                                    let mut bytes_read = 0;
467                                    let mut closed = false; 
468
469                                    loop {
470
471                                        data.resize(bytes_read + 2048, 0u8);
472                                        bytes_read += match recv.read(&mut data[bytes_read..]) {
473                                            Ok(0) => { closed = true; break },
474                                            Ok(num) => num,
475                                            Err(err) if wouldblock(&err) => break,
476                                            Err(other) => return Err(other),
477                                        };
478
479                                    }
480
481                                    data.truncate(bytes_read);
482
483                                    if bytes_read > 0 {
484
485                                        // return the data we just read as a response
486                                        responses.push(Response {
487                                            id: ReqId { inner: request.id },
488                                            state: ResponseState::Data(data),
489                                        });
490
491                                        *bytes_read_total += bytes_read;
492
493                                    }
494
495                                    let is_chunked = recv.is_chunked();
496                                    if  is_chunked && (closed == true) ||
497                                       !is_chunked && (bytes_read_total >= content_length) {
498
499                                        responses.push(Response {
500                                            id: ReqId { inner: request.id },
501                                            state: ResponseState::Done,
502                                        });
503
504                                        request.deregister(&io)?;
505                                        request.finish_done();
506
507                                        continue 'rq
508
509                                    } else if closed {
510                                        responses.push(Response::new(request.id, ResponseState::Aborted));
511                                        request.finish_error();
512                                        continue 'rq;
513                                    }
514
515                                }
516
517                            }
518
519                        },
520
521                        _other => unreachable!(),
522
523                    }
524                    
525                }
526
527            }
528
529        }
530
531        // remove all the finished requests
532        self.requests.retain(|request|
533            !request.is_finished()
534        );
535
536        Ok(responses)
537
538    }
539
540    /// Returns the smallest timeout for any of the current requests.
541    ///
542    /// Use this function to always correctly set the timeout when waiting for events with `mio`.
543    ///
544    /// # Example
545    ///
546    /// ```rust
547    /// client.send(&io, req1); // imagine 750ms timeout set on this request
548    /// client.send(&io, req2); // imagine 3s timeout set on this other one
549    /// io.poll(&mut events, client.timeout())?; // poll with smallest time left (here ~750ms)
550    /// ```
551    ///
552    /// # Note
553    ///
554    /// This function comes with a very small runtime cost sinc it has to loop over all current requests.
555    #[inline(always)]
556    pub fn timeout(&self) -> Option<Duration> {
557        let now = Instant::now();
558        self.requests.iter().filter_map(|request|
559            request.timeout.map(|timeout| timeout.checked_sub(now - request.time_created).unwrap_or(Duration::ZERO))
560        ).min()
561    }
562
563    #[cfg(feature = "tls")]
564    #[inline(always)]
565    fn default_tls_config() -> Arc<rustls::ClientConfig> {
566
567        let mut root_store = rustls::RootCertStore::empty();
568        root_store.add_trust_anchors(
569            webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta|
570                rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(ta.subject, ta.spki, ta.name_constraints)
571            )
572        );
573
574        let config = rustls::ClientConfig::builder()
575            .with_safe_defaults()
576            .with_root_certificates(root_store)
577            .with_no_client_auth();
578
579        Arc::new(config)
580
581    }
582
583    #[cfg(not(feature = "tls"))]
584    fn default_tls_config() -> () {
585        ()
586    }
587
588}
589
590struct InternalReq {
591    id: usize,
592    token: mio::Token,
593    time_created: Instant,
594    timeout: Option<Duration>,
595    state: InternalReqState,
596}
597
598impl InternalReq {
599    pub fn deregister(&mut self, io: &mio::Poll) -> io::Result<()> {
600        if let Some(conn) = self.state.connection_mut() {
601            io.registry().deregister(conn)
602        } else {
603            Ok(())
604        }
605    }
606    pub fn finish_done(&mut self) {
607        let _unused = replace(&mut self.state, InternalReqState::Done);
608    }
609    pub fn finish_error(&mut self) {
610        let _unused = replace(&mut self.state, InternalReqState::Error);
611    }
612    pub fn is_finished(&self) -> bool {
613        matches!(&self.state, InternalReqState::Done | InternalReqState::Error)
614    }
615}
616
617enum InternalReqState {
618    Unspecified,
619    Error,
620    Done,
621    Resolving {
622        body: Vec<u8>, // sent later
623        dns_id: dns::DnsId,
624        host: u64, // hashed, used for caching
625        mode: InternalMode, // used to create the connection later
626    },
627    Sending   {
628        body: Vec<u8>, // sent during this state
629        connection: Connection,
630    },
631    RecvHead  {
632        connection: Connection,
633        buffer: Vec<u8>,
634    },
635    RecvBody  {
636        recv: RecvBody,
637        bytes_read_total: usize,
638        content_length: usize,
639    },
640}
641
642impl InternalReqState {
643    pub fn connection_mut(&mut self) -> Option<&mut Connection> {
644        match self {
645            Self::Sending { connection, .. } => Some(connection),
646            Self::RecvHead { connection, .. } => Some(connection),
647            Self::RecvBody { recv, .. } => Some(recv.connection_mut()),
648            _other => None,
649        }
650    }
651}
652
653enum RecvBody {
654    Plain(io::Chain<io::Cursor<Vec<u8>>, Connection>),
655    Chunked(ChunkedDecoder<io::Chain<io::Cursor<Vec<u8>>, Connection>>)
656}
657
658impl RecvBody {
659    pub fn connection_mut(&mut self) -> &mut Connection {
660        match self {
661            Self::Plain(conn) => conn.get_mut().1,
662            Self::Chunked(decoder) => decoder.get_mut().get_mut().1
663        }
664    }
665    pub fn is_chunked(&self) -> bool {
666        match self {
667            Self::Plain(..) => false,
668            Self::Chunked(..) => true
669        }
670    }
671}
672
673impl io::Read for RecvBody {
674    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
675        match self {
676            Self::Plain(conn) => conn.read(buf),
677            Self::Chunked(decoder) => decoder.read(buf)
678        }
679    }
680}
681
682struct CachedAddr {
683    pub ip_addr: Ipv4Addr,
684    pub time_created: Instant,
685    pub ttl: Duration,
686}
687
688impl CachedAddr {
689    pub fn is_outdated(&self) -> bool {
690        self.ttl <= self.time_created.elapsed()
691    }
692}
693
694enum InternalMode {
695    Plain,
696    #[cfg(feature = "tls")]
697    Secure { tls_config: Arc<rustls::ClientConfig>, server_name: rustls::ServerName }
698}
699
700impl InternalMode {
701
702    #[cfg(feature = "tls")]
703    pub(crate) fn from_mode(mode: Mode, tls_config: &Arc<rustls::ClientConfig>, host: &str) -> Self {
704        match mode {
705            Mode::Plain => Self::Plain,
706            Mode::Secure => Self::Secure {
707                tls_config: Arc::clone(tls_config),
708                server_name: host.try_into().expect("invalid host name")
709            },
710        }
711    }
712
713    #[cfg(not(feature = "tls"))]
714    pub(crate) fn from_mode(_mode: Mode, _tls_config: &(), _host: &str) -> Self {
715        Self::Plain
716    }
717
718}
719
720enum Connection {
721    Plain { tcp_stream: TcpStream },
722    #[cfg(feature = "tls")]
723    Secure { stream: rustls::StreamOwned<rustls::ClientConnection, TcpStream> },
724}
725
726impl Connection {
727
728    pub(crate) fn new(ip_addr: Ipv4Addr, mode: InternalMode) -> io::Result<Self> {
729
730        match mode {
731            InternalMode::Plain => {
732                let tcp_stream = TcpStream::connect(make_socket_addr(ip_addr, 80))?;
733                Ok(Self::Plain { tcp_stream })
734            },
735            #[cfg(feature = "tls")]
736            InternalMode::Secure { tls_config, server_name } => {
737                let tcp_stream = TcpStream::connect(make_socket_addr(ip_addr, 443))?;
738                let tls_connection = rustls::ClientConnection::new(tls_config, server_name).map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
739                let stream = rustls::StreamOwned::new(tls_connection, tcp_stream);
740                Ok(Self::Secure { stream })
741            }
742        }
743
744    }
745
746    pub(crate) fn peer_addr(&self) -> io::Result<SocketAddr> {
747        self.tcp_stream().peer_addr()
748    }
749
750    fn tcp_stream(&self) -> &TcpStream {
751        match self {
752            Self::Plain { tcp_stream } => tcp_stream,
753            #[cfg(feature = "tls")]
754            Self::Secure { stream } => &stream.sock,
755        }
756    }
757
758    fn tcp_stream_mut(&mut self) -> &mut TcpStream {
759        match self {
760            Self::Plain { tcp_stream } => tcp_stream,
761            #[cfg(feature = "tls")]
762            Self::Secure { stream } => &mut stream.sock,
763        }
764    }
765
766    pub(crate) fn complete_io(&mut self) -> io::Result<()> {
767
768        #[cfg(feature = "tls")]
769        if let Connection::Secure { stream } = self {
770            match stream.conn.complete_io(&mut stream.sock) {
771                Ok(..) => (),
772                Err(err) if wouldblock(&err) => (),
773                Err(other) => return Err(other),
774            };
775        }
776
777        Ok(())
778
779    }
780
781}
782
783impl mio::event::Source for Connection {
784    fn register(&mut self, registry: &mio::Registry, token: mio::Token, interests: mio::Interest) -> io::Result<()> {
785        self.tcp_stream_mut().register(registry, token, interests)
786    }
787    fn reregister(&mut self, registry: &mio::Registry, token: mio::Token, interests: mio::Interest) -> io::Result<()> {
788        self.tcp_stream_mut().reregister(registry, token, interests)
789    }
790    fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
791        self.tcp_stream_mut().deregister(registry)
792    }
793}
794
795impl Read for Connection {
796
797    fn read(&mut self, buff: &mut [u8]) -> io::Result<usize> {
798        match self {
799            Self::Plain  { tcp_stream } => tcp_stream.read(buff),
800            #[cfg(feature = "tls")]
801            Self::Secure { stream } => stream.read(buff)
802        }
803    }
804
805}
806
807impl Write for Connection {
808
809    fn write(&mut self, buff: &[u8]) -> io::Result<usize> {
810        match self {
811            Self::Plain  { tcp_stream } => tcp_stream.write(buff),
812            #[cfg(feature = "tls")]
813            Self::Secure { stream } => stream.write(buff)
814        }
815    }
816
817    fn flush(&mut self) -> io::Result<()> {
818        match self {
819            Self::Plain  { tcp_stream } => tcp_stream.flush(),
820            #[cfg(feature = "tls")]
821            Self::Secure { stream } => stream.flush()
822        }
823    }
824
825}
826