1use std::marker::PhantomData;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::{fmt, net, rc};
5
6use actori_codec::{AsyncRead, AsyncWrite, Framed};
7use actori_rt::net::TcpStream;
8use actori_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory};
9use bytes::Bytes;
10use futures_core::{ready, Future};
11use futures_util::future::ok;
12use h2::server::{self, Handshake};
13use pin_project::{pin_project, project};
14
15use crate::body::MessageBody;
16use crate::builder::HttpServiceBuilder;
17use crate::cloneable::CloneableService;
18use crate::config::{KeepAlive, ServiceConfig};
19use crate::error::{DispatchError, Error};
20use crate::helpers::DataFactory;
21use crate::request::Request;
22use crate::response::Response;
23use crate::{h1, h2::Dispatcher, Protocol};
24
25pub struct HttpService<T, S, B, X = h1::ExpectHandler, U = h1::UpgradeHandler<T>> {
27 srv: S,
28 cfg: ServiceConfig,
29 expect: X,
30 upgrade: Option<U>,
31 on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
32 _t: PhantomData<(T, B)>,
33}
34
35impl<T, S, B> HttpService<T, S, B>
36where
37 S: ServiceFactory<Config = (), Request = Request>,
38 S::Error: Into<Error> + 'static,
39 S::InitError: fmt::Debug,
40 S::Response: Into<Response<B>> + 'static,
41 <S::Service as Service>::Future: 'static,
42 B: MessageBody + 'static,
43{
44 pub fn build() -> HttpServiceBuilder<T, S> {
46 HttpServiceBuilder::new()
47 }
48}
49
50impl<T, S, B> HttpService<T, S, B>
51where
52 S: ServiceFactory<Config = (), Request = Request>,
53 S::Error: Into<Error> + 'static,
54 S::InitError: fmt::Debug,
55 S::Response: Into<Response<B>> + 'static,
56 <S::Service as Service>::Future: 'static,
57 B: MessageBody + 'static,
58{
59 pub fn new<F: IntoServiceFactory<S>>(service: F) -> Self {
61 let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0, false, None);
62
63 HttpService {
64 cfg,
65 srv: service.into_factory(),
66 expect: h1::ExpectHandler,
67 upgrade: None,
68 on_connect: None,
69 _t: PhantomData,
70 }
71 }
72
73 pub(crate) fn with_config<F: IntoServiceFactory<S>>(
75 cfg: ServiceConfig,
76 service: F,
77 ) -> Self {
78 HttpService {
79 cfg,
80 srv: service.into_factory(),
81 expect: h1::ExpectHandler,
82 upgrade: None,
83 on_connect: None,
84 _t: PhantomData,
85 }
86 }
87}
88
89impl<T, S, B, X, U> HttpService<T, S, B, X, U>
90where
91 S: ServiceFactory<Config = (), Request = Request>,
92 S::Error: Into<Error> + 'static,
93 S::InitError: fmt::Debug,
94 S::Response: Into<Response<B>> + 'static,
95 <S::Service as Service>::Future: 'static,
96 B: MessageBody,
97{
98 pub fn expect<X1>(self, expect: X1) -> HttpService<T, S, B, X1, U>
104 where
105 X1: ServiceFactory<Config = (), Request = Request, Response = Request>,
106 X1::Error: Into<Error>,
107 X1::InitError: fmt::Debug,
108 <X1::Service as Service>::Future: 'static,
109 {
110 HttpService {
111 expect,
112 cfg: self.cfg,
113 srv: self.srv,
114 upgrade: self.upgrade,
115 on_connect: self.on_connect,
116 _t: PhantomData,
117 }
118 }
119
120 pub fn upgrade<U1>(self, upgrade: Option<U1>) -> HttpService<T, S, B, X, U1>
125 where
126 U1: ServiceFactory<
127 Config = (),
128 Request = (Request, Framed<T, h1::Codec>),
129 Response = (),
130 >,
131 U1::Error: fmt::Display,
132 U1::InitError: fmt::Debug,
133 <U1::Service as Service>::Future: 'static,
134 {
135 HttpService {
136 upgrade,
137 cfg: self.cfg,
138 srv: self.srv,
139 expect: self.expect,
140 on_connect: self.on_connect,
141 _t: PhantomData,
142 }
143 }
144
145 pub(crate) fn on_connect(
147 mut self,
148 f: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
149 ) -> Self {
150 self.on_connect = f;
151 self
152 }
153}
154
155impl<S, B, X, U> HttpService<TcpStream, S, B, X, U>
156where
157 S: ServiceFactory<Config = (), Request = Request>,
158 S::Error: Into<Error> + 'static,
159 S::InitError: fmt::Debug,
160 S::Response: Into<Response<B>> + 'static,
161 <S::Service as Service>::Future: 'static,
162 B: MessageBody + 'static,
163 X: ServiceFactory<Config = (), Request = Request, Response = Request>,
164 X::Error: Into<Error>,
165 X::InitError: fmt::Debug,
166 <X::Service as Service>::Future: 'static,
167 U: ServiceFactory<
168 Config = (),
169 Request = (Request, Framed<TcpStream, h1::Codec>),
170 Response = (),
171 >,
172 U::Error: fmt::Display + Into<Error>,
173 U::InitError: fmt::Debug,
174 <U::Service as Service>::Future: 'static,
175{
176 pub fn tcp(
178 self,
179 ) -> impl ServiceFactory<
180 Config = (),
181 Request = TcpStream,
182 Response = (),
183 Error = DispatchError,
184 InitError = (),
185 > {
186 pipeline_factory(|io: TcpStream| {
187 let peer_addr = io.peer_addr().ok();
188 ok((io, Protocol::Http1, peer_addr))
189 })
190 .and_then(self)
191 }
192}
193
194#[cfg(feature = "openssl")]
195mod openssl {
196 use super::*;
197 use actori_tls::openssl::{Acceptor, SslAcceptor, SslStream};
198 use actori_tls::{openssl::HandshakeError, SslError};
199
200 impl<S, B, X, U> HttpService<SslStream<TcpStream>, S, B, X, U>
201 where
202 S: ServiceFactory<Config = (), Request = Request>,
203 S::Error: Into<Error> + 'static,
204 S::InitError: fmt::Debug,
205 S::Response: Into<Response<B>> + 'static,
206 <S::Service as Service>::Future: 'static,
207 B: MessageBody + 'static,
208 X: ServiceFactory<Config = (), Request = Request, Response = Request>,
209 X::Error: Into<Error>,
210 X::InitError: fmt::Debug,
211 <X::Service as Service>::Future: 'static,
212 U: ServiceFactory<
213 Config = (),
214 Request = (Request, Framed<SslStream<TcpStream>, h1::Codec>),
215 Response = (),
216 >,
217 U::Error: fmt::Display + Into<Error>,
218 U::InitError: fmt::Debug,
219 <U::Service as Service>::Future: 'static,
220 {
221 pub fn openssl(
223 self,
224 acceptor: SslAcceptor,
225 ) -> impl ServiceFactory<
226 Config = (),
227 Request = TcpStream,
228 Response = (),
229 Error = SslError<HandshakeError<TcpStream>, DispatchError>,
230 InitError = (),
231 > {
232 pipeline_factory(
233 Acceptor::new(acceptor)
234 .map_err(SslError::Ssl)
235 .map_init_err(|_| panic!()),
236 )
237 .and_then(|io: SslStream<TcpStream>| {
238 let proto = if let Some(protos) = io.ssl().selected_alpn_protocol() {
239 if protos.windows(2).any(|window| window == b"h2") {
240 Protocol::Http2
241 } else {
242 Protocol::Http1
243 }
244 } else {
245 Protocol::Http1
246 };
247 let peer_addr = io.get_ref().peer_addr().ok();
248 ok((io, proto, peer_addr))
249 })
250 .and_then(self.map_err(SslError::Service))
251 }
252 }
253}
254
255#[cfg(feature = "rustls")]
256mod rustls {
257 use super::*;
258 use actori_tls::rustls::{Acceptor, ServerConfig, Session, TlsStream};
259 use actori_tls::SslError;
260 use std::io;
261
262 impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U>
263 where
264 S: ServiceFactory<Config = (), Request = Request>,
265 S::Error: Into<Error> + 'static,
266 S::InitError: fmt::Debug,
267 S::Response: Into<Response<B>> + 'static,
268 <S::Service as Service>::Future: 'static,
269 B: MessageBody + 'static,
270 X: ServiceFactory<Config = (), Request = Request, Response = Request>,
271 X::Error: Into<Error>,
272 X::InitError: fmt::Debug,
273 <X::Service as Service>::Future: 'static,
274 U: ServiceFactory<
275 Config = (),
276 Request = (Request, Framed<TlsStream<TcpStream>, h1::Codec>),
277 Response = (),
278 >,
279 U::Error: fmt::Display + Into<Error>,
280 U::InitError: fmt::Debug,
281 <U::Service as Service>::Future: 'static,
282 {
283 pub fn rustls(
285 self,
286 mut config: ServerConfig,
287 ) -> impl ServiceFactory<
288 Config = (),
289 Request = TcpStream,
290 Response = (),
291 Error = SslError<io::Error, DispatchError>,
292 InitError = (),
293 > {
294 let protos = vec!["h2".to_string().into(), "http/1.1".to_string().into()];
295 config.set_protocols(&protos);
296
297 pipeline_factory(
298 Acceptor::new(config)
299 .map_err(SslError::Ssl)
300 .map_init_err(|_| panic!()),
301 )
302 .and_then(|io: TlsStream<TcpStream>| {
303 let proto = if let Some(protos) = io.get_ref().1.get_alpn_protocol() {
304 if protos.windows(2).any(|window| window == b"h2") {
305 Protocol::Http2
306 } else {
307 Protocol::Http1
308 }
309 } else {
310 Protocol::Http1
311 };
312 let peer_addr = io.get_ref().0.peer_addr().ok();
313 ok((io, proto, peer_addr))
314 })
315 .and_then(self.map_err(SslError::Service))
316 }
317 }
318}
319
320impl<T, S, B, X, U> ServiceFactory for HttpService<T, S, B, X, U>
321where
322 T: AsyncRead + AsyncWrite + Unpin,
323 S: ServiceFactory<Config = (), Request = Request>,
324 S::Error: Into<Error> + 'static,
325 S::InitError: fmt::Debug,
326 S::Response: Into<Response<B>> + 'static,
327 <S::Service as Service>::Future: 'static,
328 B: MessageBody + 'static,
329 X: ServiceFactory<Config = (), Request = Request, Response = Request>,
330 X::Error: Into<Error>,
331 X::InitError: fmt::Debug,
332 <X::Service as Service>::Future: 'static,
333 U: ServiceFactory<
334 Config = (),
335 Request = (Request, Framed<T, h1::Codec>),
336 Response = (),
337 >,
338 U::Error: fmt::Display + Into<Error>,
339 U::InitError: fmt::Debug,
340 <U::Service as Service>::Future: 'static,
341{
342 type Config = ();
343 type Request = (T, Protocol, Option<net::SocketAddr>);
344 type Response = ();
345 type Error = DispatchError;
346 type InitError = ();
347 type Service = HttpServiceHandler<T, S::Service, B, X::Service, U::Service>;
348 type Future = HttpServiceResponse<T, S, B, X, U>;
349
350 fn new_service(&self, _: ()) -> Self::Future {
351 HttpServiceResponse {
352 fut: self.srv.new_service(()),
353 fut_ex: Some(self.expect.new_service(())),
354 fut_upg: self.upgrade.as_ref().map(|f| f.new_service(())),
355 expect: None,
356 upgrade: None,
357 on_connect: self.on_connect.clone(),
358 cfg: self.cfg.clone(),
359 _t: PhantomData,
360 }
361 }
362}
363
364#[doc(hidden)]
365#[pin_project]
366pub struct HttpServiceResponse<
367 T,
368 S: ServiceFactory,
369 B,
370 X: ServiceFactory,
371 U: ServiceFactory,
372> {
373 #[pin]
374 fut: S::Future,
375 #[pin]
376 fut_ex: Option<X::Future>,
377 #[pin]
378 fut_upg: Option<U::Future>,
379 expect: Option<X::Service>,
380 upgrade: Option<U::Service>,
381 on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
382 cfg: ServiceConfig,
383 _t: PhantomData<(T, B)>,
384}
385
386impl<T, S, B, X, U> Future for HttpServiceResponse<T, S, B, X, U>
387where
388 T: AsyncRead + AsyncWrite + Unpin,
389 S: ServiceFactory<Request = Request>,
390 S::Error: Into<Error> + 'static,
391 S::InitError: fmt::Debug,
392 S::Response: Into<Response<B>> + 'static,
393 <S::Service as Service>::Future: 'static,
394 B: MessageBody + 'static,
395 X: ServiceFactory<Request = Request, Response = Request>,
396 X::Error: Into<Error>,
397 X::InitError: fmt::Debug,
398 <X::Service as Service>::Future: 'static,
399 U: ServiceFactory<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
400 U::Error: fmt::Display,
401 U::InitError: fmt::Debug,
402 <U::Service as Service>::Future: 'static,
403{
404 type Output =
405 Result<HttpServiceHandler<T, S::Service, B, X::Service, U::Service>, ()>;
406
407 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
408 let mut this = self.as_mut().project();
409
410 if let Some(fut) = this.fut_ex.as_pin_mut() {
411 let expect = ready!(fut
412 .poll(cx)
413 .map_err(|e| log::error!("Init http service error: {:?}", e)))?;
414 this = self.as_mut().project();
415 *this.expect = Some(expect);
416 this.fut_ex.set(None);
417 }
418
419 if let Some(fut) = this.fut_upg.as_pin_mut() {
420 let upgrade = ready!(fut
421 .poll(cx)
422 .map_err(|e| log::error!("Init http service error: {:?}", e)))?;
423 this = self.as_mut().project();
424 *this.upgrade = Some(upgrade);
425 this.fut_ex.set(None);
426 }
427
428 let result = ready!(this
429 .fut
430 .poll(cx)
431 .map_err(|e| log::error!("Init http service error: {:?}", e)));
432 Poll::Ready(result.map(|service| {
433 let this = self.as_mut().project();
434 HttpServiceHandler::new(
435 this.cfg.clone(),
436 service,
437 this.expect.take().unwrap(),
438 this.upgrade.take(),
439 this.on_connect.clone(),
440 )
441 }))
442 }
443}
444
445pub struct HttpServiceHandler<T, S: Service, B, X: Service, U: Service> {
447 srv: CloneableService<S>,
448 expect: CloneableService<X>,
449 upgrade: Option<CloneableService<U>>,
450 cfg: ServiceConfig,
451 on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
452 _t: PhantomData<(T, B, X)>,
453}
454
455impl<T, S, B, X, U> HttpServiceHandler<T, S, B, X, U>
456where
457 S: Service<Request = Request>,
458 S::Error: Into<Error> + 'static,
459 S::Future: 'static,
460 S::Response: Into<Response<B>> + 'static,
461 B: MessageBody + 'static,
462 X: Service<Request = Request, Response = Request>,
463 X::Error: Into<Error>,
464 U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
465 U::Error: fmt::Display,
466{
467 fn new(
468 cfg: ServiceConfig,
469 srv: S,
470 expect: X,
471 upgrade: Option<U>,
472 on_connect: Option<rc::Rc<dyn Fn(&T) -> Box<dyn DataFactory>>>,
473 ) -> HttpServiceHandler<T, S, B, X, U> {
474 HttpServiceHandler {
475 cfg,
476 on_connect,
477 srv: CloneableService::new(srv),
478 expect: CloneableService::new(expect),
479 upgrade: upgrade.map(CloneableService::new),
480 _t: PhantomData,
481 }
482 }
483}
484
485impl<T, S, B, X, U> Service for HttpServiceHandler<T, S, B, X, U>
486where
487 T: AsyncRead + AsyncWrite + Unpin,
488 S: Service<Request = Request>,
489 S::Error: Into<Error> + 'static,
490 S::Future: 'static,
491 S::Response: Into<Response<B>> + 'static,
492 B: MessageBody + 'static,
493 X: Service<Request = Request, Response = Request>,
494 X::Error: Into<Error>,
495 U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
496 U::Error: fmt::Display + Into<Error>,
497{
498 type Request = (T, Protocol, Option<net::SocketAddr>);
499 type Response = ();
500 type Error = DispatchError;
501 type Future = HttpServiceHandlerResponse<T, S, B, X, U>;
502
503 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
504 let ready = self
505 .expect
506 .poll_ready(cx)
507 .map_err(|e| {
508 let e = e.into();
509 log::error!("Http service readiness error: {:?}", e);
510 DispatchError::Service(e)
511 })?
512 .is_ready();
513
514 let ready = self
515 .srv
516 .poll_ready(cx)
517 .map_err(|e| {
518 let e = e.into();
519 log::error!("Http service readiness error: {:?}", e);
520 DispatchError::Service(e)
521 })?
522 .is_ready()
523 && ready;
524
525 let ready = if let Some(ref mut upg) = self.upgrade {
526 upg.poll_ready(cx)
527 .map_err(|e| {
528 let e = e.into();
529 log::error!("Http service readiness error: {:?}", e);
530 DispatchError::Service(e)
531 })?
532 .is_ready()
533 && ready
534 } else {
535 ready
536 };
537
538 if ready {
539 Poll::Ready(Ok(()))
540 } else {
541 Poll::Pending
542 }
543 }
544
545 fn call(&mut self, (io, proto, peer_addr): Self::Request) -> Self::Future {
546 let on_connect = if let Some(ref on_connect) = self.on_connect {
547 Some(on_connect(&io))
548 } else {
549 None
550 };
551
552 match proto {
553 Protocol::Http2 => HttpServiceHandlerResponse {
554 state: State::H2Handshake(Some((
555 server::handshake(io),
556 self.cfg.clone(),
557 self.srv.clone(),
558 on_connect,
559 peer_addr,
560 ))),
561 },
562 Protocol::Http1 => HttpServiceHandlerResponse {
563 state: State::H1(h1::Dispatcher::new(
564 io,
565 self.cfg.clone(),
566 self.srv.clone(),
567 self.expect.clone(),
568 self.upgrade.clone(),
569 on_connect,
570 peer_addr,
571 )),
572 },
573 }
574 }
575}
576
577#[pin_project]
578enum State<T, S, B, X, U>
579where
580 S: Service<Request = Request>,
581 S::Future: 'static,
582 S::Error: Into<Error>,
583 T: AsyncRead + AsyncWrite + Unpin,
584 B: MessageBody,
585 X: Service<Request = Request, Response = Request>,
586 X::Error: Into<Error>,
587 U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
588 U::Error: fmt::Display,
589{
590 H1(#[pin] h1::Dispatcher<T, S, B, X, U>),
591 H2(#[pin] Dispatcher<T, S, B>),
592 H2Handshake(
593 Option<(
594 Handshake<T, Bytes>,
595 ServiceConfig,
596 CloneableService<S>,
597 Option<Box<dyn DataFactory>>,
598 Option<net::SocketAddr>,
599 )>,
600 ),
601}
602
603#[pin_project]
604pub struct HttpServiceHandlerResponse<T, S, B, X, U>
605where
606 T: AsyncRead + AsyncWrite + Unpin,
607 S: Service<Request = Request>,
608 S::Error: Into<Error> + 'static,
609 S::Future: 'static,
610 S::Response: Into<Response<B>> + 'static,
611 B: MessageBody + 'static,
612 X: Service<Request = Request, Response = Request>,
613 X::Error: Into<Error>,
614 U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
615 U::Error: fmt::Display,
616{
617 #[pin]
618 state: State<T, S, B, X, U>,
619}
620
621impl<T, S, B, X, U> Future for HttpServiceHandlerResponse<T, S, B, X, U>
622where
623 T: AsyncRead + AsyncWrite + Unpin,
624 S: Service<Request = Request>,
625 S::Error: Into<Error> + 'static,
626 S::Future: 'static,
627 S::Response: Into<Response<B>> + 'static,
628 B: MessageBody,
629 X: Service<Request = Request, Response = Request>,
630 X::Error: Into<Error>,
631 U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
632 U::Error: fmt::Display,
633{
634 type Output = Result<(), DispatchError>;
635
636 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
637 self.project().state.poll(cx)
638 }
639}
640
641impl<T, S, B, X, U> State<T, S, B, X, U>
642where
643 T: AsyncRead + AsyncWrite + Unpin,
644 S: Service<Request = Request>,
645 S::Error: Into<Error> + 'static,
646 S::Response: Into<Response<B>> + 'static,
647 B: MessageBody + 'static,
648 X: Service<Request = Request, Response = Request>,
649 X::Error: Into<Error>,
650 U: Service<Request = (Request, Framed<T, h1::Codec>), Response = ()>,
651 U::Error: fmt::Display,
652{
653 #[project]
654 fn poll(
655 mut self: Pin<&mut Self>,
656 cx: &mut Context<'_>,
657 ) -> Poll<Result<(), DispatchError>> {
658 #[project]
659 match self.as_mut().project() {
660 State::H1(disp) => disp.poll(cx),
661 State::H2(disp) => disp.poll(cx),
662 State::H2Handshake(ref mut data) => {
663 let conn = if let Some(ref mut item) = data {
664 match Pin::new(&mut item.0).poll(cx) {
665 Poll::Ready(Ok(conn)) => conn,
666 Poll::Ready(Err(err)) => {
667 trace!("H2 handshake error: {}", err);
668 return Poll::Ready(Err(err.into()));
669 }
670 Poll::Pending => return Poll::Pending,
671 }
672 } else {
673 panic!()
674 };
675 let (_, cfg, srv, on_connect, peer_addr) = data.take().unwrap();
676 self.set(State::H2(Dispatcher::new(
677 srv, conn, on_connect, cfg, None, peer_addr,
678 )));
679 self.poll(cx)
680 }
681 }
682 }
683}