http_pool/http2/
sender.rs

1use crate::body::VariantBody;
2use hyper::body::Incoming;
3use hyper::client::conn::{TrySendError, http2};
4use hyper::http::uri::InvalidUri;
5use hyper::{Method, Request, Response, Uri, Version, http};
6use std::fmt::Debug;
7use std::ops::{Deref, DerefMut};
8use std::sync::Arc;
9use crate::utils::RefCount;
10
11#[derive(Debug, Clone)]
12pub(crate) struct SendRequest {
13    inner: http2::SendRequest<VariantBody>,
14    _ref: Arc<()>,
15}
16
17impl SendRequest {
18    pub(crate) fn new(inner: http2::SendRequest<VariantBody>) -> Self {
19        Self {
20            inner,
21            _ref: Arc::new(()),
22        }
23    }
24
25    /// 引用数量
26    pub(crate) fn ref_count(&self) -> usize {
27        Arc::strong_count(&self._ref)
28    }
29
30    /// 是否达到最大数
31    pub(crate) fn limited(&self, max_streams: Option<usize>) -> bool {
32        if let Some(max) = max_streams {
33            // 这里要加2, 因为队列里一份, run那里一份
34            if self.ref_count() >= max + 2 {
35                return true;
36            }
37        }
38        false
39    }
40}
41
42impl Deref for SendRequest {
43    type Target = http2::SendRequest<VariantBody>;
44    fn deref(&self) -> &Self::Target {
45        &self.inner
46    }
47}
48
49impl DerefMut for SendRequest {
50    fn deref_mut(&mut self) -> &mut Self::Target {
51        &mut self.inner
52    }
53}
54
55impl PartialEq for SendRequest {
56    fn eq(&self, other: &Self) -> bool {
57        self._ref.as_ref() as *const _ == other._ref.as_ref() as *const _
58    }
59}
60
61impl RefCount for SendRequest {
62    fn ref_count(&self) -> usize {
63        self.ref_count()
64    }
65}
66
67#[derive(Debug)]
68pub struct Sender {
69    sender: SendRequest,
70    base_url: String,
71}
72
73impl Sender {
74    pub(crate) fn new(sender: SendRequest, base_url: String) -> Self {
75        Sender { sender, base_url }
76    }
77
78    #[allow(dead_code)]
79    pub(crate) fn from_http2_sr(sr: http2::SendRequest<VariantBody>, base_url: String) -> Self {
80        Self::new(SendRequest::new(sr), base_url)
81    }
82
83    pub fn base_url(&self) -> &String {
84        &self.base_url
85    }
86
87    pub fn new_uri(&self, uri: &Uri) -> Result<Uri, InvalidUri> {
88        crate::utils::new_uri(self.base_url.clone(), uri)
89    }
90
91    pub fn is_ready(&self) -> bool {
92        self.sender.is_ready()
93    }
94
95    pub fn is_closed(&self) -> bool {
96        self.sender.is_closed()
97    }
98
99    pub fn send_request(
100        &mut self,
101        req: Request<VariantBody>,
102    ) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
103        self.sender.send_request(req)
104    }
105
106    pub fn try_send_request(
107        &mut self,
108        req: Request<VariantBody>,
109    ) -> impl Future<Output = Result<Response<Incoming>, TrySendError<Request<VariantBody>>>> {
110        self.sender.try_send_request(req)
111    }
112}
113
114pub fn request_builder<T>(uri: T, method: Method) -> http::request::Builder
115where
116    T: TryInto<Uri>,
117    <T as TryInto<Uri>>::Error: Into<http::Error>,
118{
119    Request::builder()
120        .version(Version::HTTP_2)
121        .method(method)
122        .uri(uri)
123        .header(hyper::header::USER_AGENT, "proxy/0.1")
124}