rtv/client.rs
1
2//! This module contains an HTTP [`Client`] that runs ontop of `mio`.
3
4use mio::net::TcpStream;
5use chunked_transfer::Decoder as ChunkedDecoder;
6use std::{io::{self, Write, Read}, time::{Duration, Instant}, collections::HashMap, net::{SocketAddr, Ipv4Addr}, mem::replace};
7use crate::{dns, util::{make_socket_addr, notconnected, register_all, wouldblock, hash}, ResponseHead, ReqId, Response, ResponseState, Mode, Status, OwnedHeader, RawRequest};
8
9#[cfg(feature = "tls")]
10use std::sync::Arc;
11
12/// A flexible HTTP client.
13///
14/// Use the client if you wanna have controll over `mio` yourself.
15/// You should look at the documentation of the individual methods for more info on
16/// what exactly they do.
17///
18/// In general, you pass the client a handle to your `Poll` when you send a request.
19/// Inside you `mio` event loop, when you get an event, you then call the [`Client::pump`] function,
20/// which drives the request to completion.
21///
22/// # Example
23///
24/// This is more or less a full blown example on what it takes to correctly
25/// send a request.
26///
27/// ```rust
28///
29/// let io = mio::Poll::new()?;
30/// let mut client = rtv::Client::new(mio::Token(0));
31///
32/// let request = Request::get().host("example.com").https();
33/// let _id = client.send(&io, mio::Token(2), request)?;
34/// // ^^^ the returned id can be used to check which response belongs to which request
35/// // although we are just sending one request here so this isn't needed
36///
37/// // we have to store the body ourselfes
38/// let mut response_body = Vec::new();
39///
40/// 'ev: loop {
41///
42/// // see note below on how to handle timeouts
43/// io.poll(&mut events, client.timeout())?;
44///
45/// // loop over all the responses we may have gotten
46/// // you don't need to handle events generated by rtv in any other way
47/// for resp in client.pump(&io, &events)? {
48/// match resp.state {
49/// rtv::ResponseState::Head(head) => {
50/// // the head contains headers etc.
51/// pritnln!("Content-Length: {}", head.content_length);
52/// pritnln!("Some header: {}", head.headers[0]);
53/// },
54/// rtv::ResponseState::Data(some_data) => {
55/// // you will receive data in small chunks as it comes in
56/// response_body.extend(some_data);
57/// },
58/// rtv::ResponseState::Done => {
59/// break 'ev;
60/// },
61/// // maybe a timeout or I/O error
62/// other => panic!("Error: {}", other),
63/// };
64/// };
65///
66/// events.clear();
67///
68/// }
69///
70/// let body_str = str::from_utf8(&response_body)?;
71/// println!("{}", body_str);
72///
73/// ```
74///
75/// # Timeouts
76///
77/// Rtv supports a timeout for every individual request. It will even be
78/// applied to dns resolution.
79///
80/// You have to specify this timeout in two places. First, when creating your
81/// `Request` and then once again when waiting for events with `mio`.
82///
83/// The timeout used with `mio` always has to match the smallest time left
84/// for any request currently in progress, so that the `Client` can terminate
85/// the request if the timeout is reached.
86///
87/// You could do this manually but you should probably use [`Client::timeout`]
88/// which does the calculation for you.
89pub struct Client {
90 dns: dns::DnsClient,
91 dns_cache: HashMap<u64, CachedAddr>,
92 requests: Vec<InternalReq>,
93 next_id: usize,
94 #[cfg(feature = "tls")]
95 tls_config: Arc<rustls::ClientConfig>,
96 #[cfg(not(feature = "tls"))]
97 tls_config: (),
98}
99
100impl Client {
101
102 /// Creates a new client.
103 ///
104 /// The token you pass in will be used for dns resolution as
105 /// this requires (only) one socket.
106 #[inline(always)]
107 pub fn new(token: mio::Token) -> Self {
108
109 let tls_config = Self::default_tls_config();
110
111 Self {
112 dns: dns::DnsClient::new(token),
113 dns_cache: HashMap::new(),
114 requests: Vec::new(),
115 next_id: 0,
116 tls_config,
117 }
118
119 }
120
121 /// Creates a new client with a custom [`ClientConfig`](rustls::ClientConfig).
122 ///
123 /// The token you pass in will be used for dns resolution as
124 /// this requires (only) one socket.
125 #[cfg(feature = "tls")]
126 #[inline(always)]
127 pub fn with_tls_config(token: mio::Token, tls_config: Arc<rustls::ClientConfig>) -> Self {
128
129 Self {
130 dns: dns::DnsClient::new(token),
131 dns_cache: HashMap::new(),
132 requests: Vec::new(),
133 next_id: 0,
134 tls_config,
135 }
136
137 }
138
139 /// Send a request.
140 ///
141 /// The token you pass in will be used for this request's TCP connection.
142 /// It will be available again once the request completed.
143 ///
144 /// This function will return a [`ReqId`] that can be used to check which response
145 /// belongs to which request later.
146 ///
147 /// For more information on how to create a request see [`Request`] and [`RequestBuilder`](crate::RequestBuilder).
148 /// If you wanna set a timeout, you can do so when creating a request.
149 /// This function can take anything that implements `Into<Request>` so you can pass it a
150 /// `Request` or a `RequestBuilder`, both will work.
151 ///
152 /// # Example
153 ///
154 /// ```rust
155 /// let request = Request::get().host("example.com");
156 /// client.send(&io, mio::Token(1), request)?; // io is the mio::Poll
157 /// ```
158 pub fn send(&mut self, io: &mio::Poll, token: mio::Token, input: impl Into<RawRequest>) -> io::Result<ReqId> {
159
160 let request = input.into();
161
162 let id = self.next_id;
163 self.next_id = self.next_id.wrapping_add(1);
164
165 let mode = InternalMode::from_mode(request.mode, &self.tls_config, request.host());
166
167 let maybe_cached = self.dns_cache.get(&hash(request.host()));
168 let state = match maybe_cached {
169
170 Some(cached_addr) if !cached_addr.is_outdated() => {
171
172 let mut connection = Connection::new(cached_addr.ip_addr, mode)?;
173 register_all(io, &mut connection, token)?;
174 InternalReqState::Sending {
175 body: request.bytes,
176 connection,
177 }
178
179 },
180
181 _not_cached_or_old => {
182
183 let dns_id = self.dns.resolve(io, request.host(), request.timeout)?;
184 InternalReqState::Resolving {
185 host: hash(request.host()),
186 body: request.bytes,
187 dns_id,
188 mode
189 }
190
191 },
192
193 };
194
195 let internal_req = InternalReq {
196 id,
197 token,
198 state,
199 time_created: Instant::now(),
200 timeout: request.timeout,
201 };
202
203 self.requests.push(internal_req);
204
205 Ok(ReqId { inner: id })
206
207 }
208
209 /// Drive all sent requests to completion and get the responses.
210 ///
211 /// The `pump` function must be executed everytime an event is generated which
212 /// belongs to this `Client`. You don't need to match against the event token
213 /// yourself though as this is done internally.
214 /// All events not belonging to this `Client` will be ignored.
215 ///
216 /// This function will return a `Vec` of responses, that contain the [`ReqId`] of
217 /// the request that belongs to the response.
218 /// The returned `Vec` may be empty, for example if the event belonged to dns resolution.
219 ///
220 /// In general a request will go through following stages:
221 /// 1. Dns resolution, which will generate one or more events.
222 /// 2. Receiving the head, with information about the response such as the content length.
223 /// ([`ResponseState::Head`])
224 /// 3. Receiving the body, which will generate multiple events.
225 /// ([`ResponseState::Data`])
226 /// 4. In the end either [`ResponseState::Done`] or [`ResponseState::Error`].
227 ///
228 /// # Example
229 ///
230 /// ```rust
231 /// let events = ...; // wait for new events using mio
232 /// let resps = client.pump(&io, &events)?;
233 /// if resps.is_empty() { println!("Got an event but no response yet!") }
234 /// for resp in resps {
235 /// println!("Got a response: {:?}", resp.state);
236 /// }
237 /// ```
238 ///
239 /// # Note
240 ///
241 /// The maximum response header count is currently 4096, but this will be
242 /// user-controllable in the future.
243 pub fn pump(&mut self, io: &mio::Poll, events: &mio::Events) -> io::Result<Vec<Response>> {
244
245 let mut responses = Vec::new();
246
247 let dns_resps = self.dns.pump(&io, events)?;
248
249 'rq: for request in self.requests.iter_mut() {
250
251 // finish timed out requests
252 if request.timeout.unwrap_or(Duration::MAX) <= request.time_created.elapsed() {
253
254 responses.push(Response::new(request.id, ResponseState::TimedOut));
255 request.deregister(&io)?; // todo: make io errors not "hard errors" but make them
256 // also be per-request and make it so that you can retry completing the request
257 // after an io error (maybe?)
258 request.finish_error();
259
260 } else {
261
262 if let Some(connection) = request.state.connection_mut() {
263 // we need to "pump" rustls so it can do the handshake etc.
264 connection.complete_io()?;
265 }
266
267 for event in events.iter() {
268
269 match &mut request.state {
270
271 InternalReqState::Resolving { dns_id, .. } => {
272
273 for resp in dns_resps.iter() {
274
275 if &resp.id == dns_id {
276
277 // dispatch the result
278 // we don't need to call deregister on error since
279 // we haven't registered anything yet
280 let (addr, ttl) = match resp.outcome {
281 dns::DnsOutcome::Known { addr, ttl } => (addr, ttl),
282 dns::DnsOutcome::Unknown => {
283 responses.push(Response::new(request.id, ResponseState::UnknownHost));
284 request.finish_error();
285 continue 'rq;
286 },
287 dns::DnsOutcome::ProtocolError => {
288 responses.push(Response::new(request.id, ResponseState::ProtocolError));
289 request.finish_error();
290 continue 'rq;
291 },
292 dns::DnsOutcome::TimedOut => {
293 responses.push(Response::new(request.id, ResponseState::TimedOut));
294 request.finish_error();
295 continue 'rq;
296 },
297 };
298
299 let state = replace(&mut request.state, InternalReqState::Unspecified);
300 if let InternalReqState::Resolving { body, host, mode, .. } = state {
301
302 self.dns_cache.insert(host, CachedAddr {
303 ip_addr: addr,
304 time_created: Instant::now(),
305 ttl,
306 });
307
308 let mut connection = Connection::new(addr, mode)?;
309 register_all(io, &mut connection, request.token)?;
310
311 request.state = InternalReqState::Sending { body, connection };
312
313 continue 'rq;
314
315 } else {
316 unreachable!()
317 }
318
319 }
320
321 }
322
323 },
324
325 InternalReqState::Sending { body, connection } => {
326
327 if event.token() == request.token {
328
329 match connection.peer_addr() {
330 Ok(..) => {
331
332 match connection.write(&body) {
333 Ok(..) => (),
334 // during tls handshake it blocks (since the stream is still in rustls's controll)
335 Err(err) if wouldblock(&err) => continue 'rq,
336 Err(other) => return Err(other),
337 };
338
339 let state = replace(&mut request.state, InternalReqState::Unspecified);
340 if let InternalReqState::Sending { connection, .. } = state {
341
342 request.state = InternalReqState::RecvHead {
343 connection,
344 buffer: Vec::with_capacity(1024),
345 };
346
347 } else {
348 unreachable!()
349 }
350
351 },
352 Err(err) if notconnected(&err) => continue 'rq,
353 Err(other) => return Err(other),
354 }
355
356 }
357
358 },
359
360 // this is handeled in this kinda scuffed way to avoid some code duplication
361 // after succesfully reading the `Head` the state is updated to `RecvBody`
362 // which causes both the code for `RecvHead` and `RecvBody` to run
363 InternalReqState::RecvHead { .. } |
364 InternalReqState::RecvBody { .. } => {
365
366 if event.token() == request.token {
367
368 // we will get another `writable` event after sending the payload
369 // so we have to check here that this is actually a `readable` event
370 if event.is_readable() {
371
372 if let InternalReqState::RecvHead { connection, buffer } = &mut request.state {
373
374 let mut bytes_read = buffer.len();
375 let mut closed = false;
376
377 loop {
378
379 buffer.resize(bytes_read + 2048, 0u8);
380 bytes_read += match connection.read(&mut buffer[bytes_read..]) {
381 Ok(0) => { closed = true; break },
382 Ok(num) => num,
383 Err(err) if wouldblock(&err) => break,
384 Err(other) => return Err(other),
385 };
386
387 }
388
389 buffer.truncate(bytes_read);
390
391 let mut headers = [httparse::EMPTY_HEADER; 4096]; // todo: make the max header count be controllable by the user
392 let mut head = httparse::Response::new(&mut headers);
393 let status = match head.parse(&buffer) {
394 Ok(val) => val,
395 Err(_err) => {
396 responses.push(Response::new(request.id, ResponseState::ProtocolError));
397 request.finish_error();
398 continue 'rq;
399 }
400 };
401
402 if let httparse::Status::Complete(body_start) = status {
403
404 let content_length = head.headers.iter()
405 .find(|header| header.name == "Content-Length")
406 .map(|header| usize::from_str_radix(std::str::from_utf8(header.value)
407 .expect("Content-Length was invalid utf8"), 10)
408 .expect("Content-Length was not a number"))
409 .unwrap_or_default();
410
411 let transfer_chunked = head.headers.iter()
412 .find(|header| header.name == "Transfer-Encoding" && header.value == b"chunked")
413 .is_some();
414
415 responses.push(Response {
416 id: ReqId { inner: request.id },
417 state: ResponseState::Head(ResponseHead {
418 status: Status {
419 code: head.code.expect("missing status code"),
420 reason: head.reason.expect("missing reason").to_string(),
421 },
422 content_length,
423 transfer_chunked,
424 headers: head.headers.iter().map(OwnedHeader::from).collect(),
425 })
426 });
427
428 // remove the parsed head from the buffer
429 buffer.drain(..body_start);
430
431 let state = replace(&mut request.state, InternalReqState::Unspecified);
432 if let InternalReqState::RecvHead { connection, buffer } = state {
433
434 let chain = io::Cursor::new(buffer).chain(connection);
435 let recv = if transfer_chunked {
436 RecvBody::Chunked(ChunkedDecoder::new(chain))
437 } else {
438 RecvBody::Plain(chain)
439 };
440
441 request.state = InternalReqState::RecvBody {
442 recv,
443 bytes_read_total: 0,
444 content_length
445 };
446
447 // fall through to RecvBody
448
449 } else {
450 unreachable!()
451 }
452
453 } else if closed {
454 responses.push(Response::new(request.id, ResponseState::Aborted));
455 request.finish_error();
456 continue 'rq;
457 }
458
459 }
460
461 }
462
463 if let InternalReqState::RecvBody { recv, bytes_read_total, content_length } = &mut request.state {
464
465 let mut data = Vec::new();
466 let mut bytes_read = 0;
467 let mut closed = false;
468
469 loop {
470
471 data.resize(bytes_read + 2048, 0u8);
472 bytes_read += match recv.read(&mut data[bytes_read..]) {
473 Ok(0) => { closed = true; break },
474 Ok(num) => num,
475 Err(err) if wouldblock(&err) => break,
476 Err(other) => return Err(other),
477 };
478
479 }
480
481 data.truncate(bytes_read);
482
483 if bytes_read > 0 {
484
485 // return the data we just read as a response
486 responses.push(Response {
487 id: ReqId { inner: request.id },
488 state: ResponseState::Data(data),
489 });
490
491 *bytes_read_total += bytes_read;
492
493 }
494
495 let is_chunked = recv.is_chunked();
496 if is_chunked && (closed == true) ||
497 !is_chunked && (bytes_read_total >= content_length) {
498
499 responses.push(Response {
500 id: ReqId { inner: request.id },
501 state: ResponseState::Done,
502 });
503
504 request.deregister(&io)?;
505 request.finish_done();
506
507 continue 'rq
508
509 } else if closed {
510 responses.push(Response::new(request.id, ResponseState::Aborted));
511 request.finish_error();
512 continue 'rq;
513 }
514
515 }
516
517 }
518
519 },
520
521 _other => unreachable!(),
522
523 }
524
525 }
526
527 }
528
529 }
530
531 // remove all the finished requests
532 self.requests.retain(|request|
533 !request.is_finished()
534 );
535
536 Ok(responses)
537
538 }
539
540 /// Returns the smallest timeout for any of the current requests.
541 ///
542 /// Use this function to always correctly set the timeout when waiting for events with `mio`.
543 ///
544 /// # Example
545 ///
546 /// ```rust
547 /// client.send(&io, req1); // imagine 750ms timeout set on this request
548 /// client.send(&io, req2); // imagine 3s timeout set on this other one
549 /// io.poll(&mut events, client.timeout())?; // poll with smallest time left (here ~750ms)
550 /// ```
551 ///
552 /// # Note
553 ///
554 /// This function comes with a very small runtime cost sinc it has to loop over all current requests.
555 #[inline(always)]
556 pub fn timeout(&self) -> Option<Duration> {
557 let now = Instant::now();
558 self.requests.iter().filter_map(|request|
559 request.timeout.map(|timeout| timeout.checked_sub(now - request.time_created).unwrap_or(Duration::ZERO))
560 ).min()
561 }
562
563 #[cfg(feature = "tls")]
564 #[inline(always)]
565 fn default_tls_config() -> Arc<rustls::ClientConfig> {
566
567 let mut root_store = rustls::RootCertStore::empty();
568 root_store.add_trust_anchors(
569 webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta|
570 rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(ta.subject, ta.spki, ta.name_constraints)
571 )
572 );
573
574 let config = rustls::ClientConfig::builder()
575 .with_safe_defaults()
576 .with_root_certificates(root_store)
577 .with_no_client_auth();
578
579 Arc::new(config)
580
581 }
582
583 #[cfg(not(feature = "tls"))]
584 fn default_tls_config() -> () {
585 ()
586 }
587
588}
589
590struct InternalReq {
591 id: usize,
592 token: mio::Token,
593 time_created: Instant,
594 timeout: Option<Duration>,
595 state: InternalReqState,
596}
597
598impl InternalReq {
599 pub fn deregister(&mut self, io: &mio::Poll) -> io::Result<()> {
600 if let Some(conn) = self.state.connection_mut() {
601 io.registry().deregister(conn)
602 } else {
603 Ok(())
604 }
605 }
606 pub fn finish_done(&mut self) {
607 let _unused = replace(&mut self.state, InternalReqState::Done);
608 }
609 pub fn finish_error(&mut self) {
610 let _unused = replace(&mut self.state, InternalReqState::Error);
611 }
612 pub fn is_finished(&self) -> bool {
613 matches!(&self.state, InternalReqState::Done | InternalReqState::Error)
614 }
615}
616
617enum InternalReqState {
618 Unspecified,
619 Error,
620 Done,
621 Resolving {
622 body: Vec<u8>, // sent later
623 dns_id: dns::DnsId,
624 host: u64, // hashed, used for caching
625 mode: InternalMode, // used to create the connection later
626 },
627 Sending {
628 body: Vec<u8>, // sent during this state
629 connection: Connection,
630 },
631 RecvHead {
632 connection: Connection,
633 buffer: Vec<u8>,
634 },
635 RecvBody {
636 recv: RecvBody,
637 bytes_read_total: usize,
638 content_length: usize,
639 },
640}
641
642impl InternalReqState {
643 pub fn connection_mut(&mut self) -> Option<&mut Connection> {
644 match self {
645 Self::Sending { connection, .. } => Some(connection),
646 Self::RecvHead { connection, .. } => Some(connection),
647 Self::RecvBody { recv, .. } => Some(recv.connection_mut()),
648 _other => None,
649 }
650 }
651}
652
653enum RecvBody {
654 Plain(io::Chain<io::Cursor<Vec<u8>>, Connection>),
655 Chunked(ChunkedDecoder<io::Chain<io::Cursor<Vec<u8>>, Connection>>)
656}
657
658impl RecvBody {
659 pub fn connection_mut(&mut self) -> &mut Connection {
660 match self {
661 Self::Plain(conn) => conn.get_mut().1,
662 Self::Chunked(decoder) => decoder.get_mut().get_mut().1
663 }
664 }
665 pub fn is_chunked(&self) -> bool {
666 match self {
667 Self::Plain(..) => false,
668 Self::Chunked(..) => true
669 }
670 }
671}
672
673impl io::Read for RecvBody {
674 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
675 match self {
676 Self::Plain(conn) => conn.read(buf),
677 Self::Chunked(decoder) => decoder.read(buf)
678 }
679 }
680}
681
682struct CachedAddr {
683 pub ip_addr: Ipv4Addr,
684 pub time_created: Instant,
685 pub ttl: Duration,
686}
687
688impl CachedAddr {
689 pub fn is_outdated(&self) -> bool {
690 self.ttl <= self.time_created.elapsed()
691 }
692}
693
694enum InternalMode {
695 Plain,
696 #[cfg(feature = "tls")]
697 Secure { tls_config: Arc<rustls::ClientConfig>, server_name: rustls::ServerName }
698}
699
700impl InternalMode {
701
702 #[cfg(feature = "tls")]
703 pub(crate) fn from_mode(mode: Mode, tls_config: &Arc<rustls::ClientConfig>, host: &str) -> Self {
704 match mode {
705 Mode::Plain => Self::Plain,
706 Mode::Secure => Self::Secure {
707 tls_config: Arc::clone(tls_config),
708 server_name: host.try_into().expect("invalid host name")
709 },
710 }
711 }
712
713 #[cfg(not(feature = "tls"))]
714 pub(crate) fn from_mode(_mode: Mode, _tls_config: &(), _host: &str) -> Self {
715 Self::Plain
716 }
717
718}
719
720enum Connection {
721 Plain { tcp_stream: TcpStream },
722 #[cfg(feature = "tls")]
723 Secure { stream: rustls::StreamOwned<rustls::ClientConnection, TcpStream> },
724}
725
726impl Connection {
727
728 pub(crate) fn new(ip_addr: Ipv4Addr, mode: InternalMode) -> io::Result<Self> {
729
730 match mode {
731 InternalMode::Plain => {
732 let tcp_stream = TcpStream::connect(make_socket_addr(ip_addr, 80))?;
733 Ok(Self::Plain { tcp_stream })
734 },
735 #[cfg(feature = "tls")]
736 InternalMode::Secure { tls_config, server_name } => {
737 let tcp_stream = TcpStream::connect(make_socket_addr(ip_addr, 443))?;
738 let tls_connection = rustls::ClientConnection::new(tls_config, server_name).map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
739 let stream = rustls::StreamOwned::new(tls_connection, tcp_stream);
740 Ok(Self::Secure { stream })
741 }
742 }
743
744 }
745
746 pub(crate) fn peer_addr(&self) -> io::Result<SocketAddr> {
747 self.tcp_stream().peer_addr()
748 }
749
750 fn tcp_stream(&self) -> &TcpStream {
751 match self {
752 Self::Plain { tcp_stream } => tcp_stream,
753 #[cfg(feature = "tls")]
754 Self::Secure { stream } => &stream.sock,
755 }
756 }
757
758 fn tcp_stream_mut(&mut self) -> &mut TcpStream {
759 match self {
760 Self::Plain { tcp_stream } => tcp_stream,
761 #[cfg(feature = "tls")]
762 Self::Secure { stream } => &mut stream.sock,
763 }
764 }
765
766 pub(crate) fn complete_io(&mut self) -> io::Result<()> {
767
768 #[cfg(feature = "tls")]
769 if let Connection::Secure { stream } = self {
770 match stream.conn.complete_io(&mut stream.sock) {
771 Ok(..) => (),
772 Err(err) if wouldblock(&err) => (),
773 Err(other) => return Err(other),
774 };
775 }
776
777 Ok(())
778
779 }
780
781}
782
783impl mio::event::Source for Connection {
784 fn register(&mut self, registry: &mio::Registry, token: mio::Token, interests: mio::Interest) -> io::Result<()> {
785 self.tcp_stream_mut().register(registry, token, interests)
786 }
787 fn reregister(&mut self, registry: &mio::Registry, token: mio::Token, interests: mio::Interest) -> io::Result<()> {
788 self.tcp_stream_mut().reregister(registry, token, interests)
789 }
790 fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
791 self.tcp_stream_mut().deregister(registry)
792 }
793}
794
795impl Read for Connection {
796
797 fn read(&mut self, buff: &mut [u8]) -> io::Result<usize> {
798 match self {
799 Self::Plain { tcp_stream } => tcp_stream.read(buff),
800 #[cfg(feature = "tls")]
801 Self::Secure { stream } => stream.read(buff)
802 }
803 }
804
805}
806
807impl Write for Connection {
808
809 fn write(&mut self, buff: &[u8]) -> io::Result<usize> {
810 match self {
811 Self::Plain { tcp_stream } => tcp_stream.write(buff),
812 #[cfg(feature = "tls")]
813 Self::Secure { stream } => stream.write(buff)
814 }
815 }
816
817 fn flush(&mut self) -> io::Result<()> {
818 match self {
819 Self::Plain { tcp_stream } => tcp_stream.flush(),
820 #[cfg(feature = "tls")]
821 Self::Secure { stream } => stream.flush()
822 }
823 }
824
825}
826