1use std::{
30 convert::Infallible,
31 sync::Arc,
32 task::{Context, Poll},
33};
34
35use bytes::Bytes;
36use futures_util::future::{self, Ready};
37use http::{Request, Response};
38use http_body::Body;
39use http_body_util::Empty;
40use hyper::service::Service as HyperSvc;
41use tower_service::Service as TowerSvc;
42
43use crate::{
44 body::ResponseBody, config::EngineIoConfig, engine::EngineIo, handler::EngineIoHandler,
45};
46
47mod futures;
48mod parser;
49
50pub use self::parser::{ProtocolVersion, TransportType};
51use self::{futures::ResponseFuture, parser::dispatch_req};
52
53pub struct EngineIoService<H: EngineIoHandler, S = NotFoundService> {
59 inner: S,
60 engine: Arc<EngineIo<H>>,
61}
62
63impl<H: EngineIoHandler> EngineIoService<H, NotFoundService> {
64 pub fn new(handler: Arc<H>) -> Self {
67 EngineIoService::with_config(handler, EngineIoConfig::default())
68 }
69 pub fn with_config(handler: Arc<H>, config: EngineIoConfig) -> Self {
71 EngineIoService::with_config_inner(NotFoundService, handler, config)
72 }
73}
74
75impl<S: Clone, H: EngineIoHandler> EngineIoService<H, S> {
76 pub fn with_inner(inner: S, handler: Arc<H>) -> Self {
78 EngineIoService::with_config_inner(inner, handler, EngineIoConfig::default())
79 }
80
81 pub fn with_config_inner(inner: S, handler: Arc<H>, config: EngineIoConfig) -> Self {
83 EngineIoService {
84 inner,
85 engine: Arc::new(EngineIo::new(handler, config)),
86 }
87 }
88
89 pub fn into_make_service(self) -> MakeEngineIoService<H, S> {
92 MakeEngineIoService::new(self)
93 }
94}
95
96impl<S: Clone, H: EngineIoHandler> Clone for EngineIoService<H, S> {
97 fn clone(&self) -> Self {
98 EngineIoService {
99 inner: self.inner.clone(),
100 engine: self.engine.clone(),
101 }
102 }
103}
104impl<H: EngineIoHandler, S> std::fmt::Debug for EngineIoService<H, S> {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 f.debug_struct("EngineIoService").finish()
107 }
108}
109
110impl<H, ReqBody, ResBody, S> TowerSvc<Request<ReqBody>> for EngineIoService<H, S>
112where
113 H: EngineIoHandler,
114 ReqBody: Body + Send + Unpin + 'static + std::fmt::Debug,
115 ReqBody::Error: std::fmt::Debug,
116 ReqBody::Data: Send,
117 ResBody: Body + Send + 'static,
118 S: TowerSvc<Request<ReqBody>, Response = Response<ResBody>>,
119{
120 type Response = Response<ResponseBody<ResBody>>;
121 type Error = S::Error;
122 type Future = ResponseFuture<S::Future, ResBody>;
123
124 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
125 self.inner.poll_ready(cx)
126 }
127
128 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
129 let path = self.engine.config.req_path.as_ref();
130 if req.uri().path().starts_with(path) {
131 dispatch_req(req, self.engine.clone())
132 } else {
133 ResponseFuture::new(self.inner.call(req))
134 }
135 }
136}
137
138impl<H, ReqBody, ResBody, S> HyperSvc<Request<ReqBody>> for EngineIoService<H, S>
140where
141 H: EngineIoHandler,
142 ReqBody: Body + Send + Unpin + 'static + std::fmt::Debug,
143 ReqBody::Error: std::fmt::Debug,
144 ReqBody::Data: Send,
145 ResBody: Body + Send + 'static,
146 S: HyperSvc<Request<ReqBody>, Response = Response<ResBody>>,
147{
148 type Response = Response<ResponseBody<ResBody>>;
149 type Error = S::Error;
150 type Future = ResponseFuture<S::Future, ResBody>;
151
152 fn call(&self, req: Request<ReqBody>) -> Self::Future {
153 let path = self.engine.config.req_path.as_ref();
154 if req.uri().path().starts_with(path) {
155 dispatch_req(req, self.engine.clone())
156 } else {
157 ResponseFuture::new(self.inner.call(req))
158 }
159 }
160}
161
162#[cfg(feature = "__test_harness")]
163#[doc(hidden)]
164impl<H, Svc> EngineIoService<H, Svc>
165where
166 H: EngineIoHandler,
167{
168 pub fn ws_init<S>(
171 &self,
172 conn: S,
173 protocol: ProtocolVersion,
174 sid: Option<engineioxide_core::Sid>,
175 req_data: http::request::Parts,
176 ) -> impl std::future::Future<Output = Result<(), crate::errors::Error>> + 'static
177 where
178 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
179 {
180 let engine = self.engine.clone();
181 crate::transport::ws::on_init(engine, conn, protocol, sid, req_data)
182 }
183}
184
185pub struct MakeEngineIoService<H: EngineIoHandler, S> {
187 svc: EngineIoService<H, S>,
188}
189
190impl<H: EngineIoHandler, S> MakeEngineIoService<H, S> {
191 pub fn new(svc: EngineIoService<H, S>) -> Self {
193 MakeEngineIoService { svc }
194 }
195}
196
197impl<H: EngineIoHandler, S: Clone, T> TowerSvc<T> for MakeEngineIoService<H, S> {
198 type Response = EngineIoService<H, S>;
199
200 type Error = Infallible;
201
202 type Future = Ready<Result<Self::Response, Self::Error>>;
203
204 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
205 Poll::Ready(Ok(()))
206 }
207
208 fn call(&mut self, _req: T) -> Self::Future {
209 future::ready(Ok(self.svc.clone()))
210 }
211}
212
213#[derive(Debug, Clone)]
215pub struct NotFoundService;
216
217impl<ReqBody> TowerSvc<Request<ReqBody>> for NotFoundService {
219 type Response = Response<ResponseBody<Empty<Bytes>>>;
220 type Error = Infallible;
221 type Future = Ready<Result<Response<ResponseBody<Empty<Bytes>>>, Infallible>>;
222
223 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
224 Poll::Ready(Ok(()))
225 }
226
227 fn call(&mut self, _: Request<ReqBody>) -> Self::Future {
228 future::ready(Ok(Response::builder()
229 .status(404)
230 .body(ResponseBody::empty_response())
231 .unwrap()))
232 }
233}
234
235impl<ReqBody> HyperSvc<Request<ReqBody>> for NotFoundService {
237 type Response = Response<ResponseBody<Empty<Bytes>>>;
238 type Error = Infallible;
239 type Future = Ready<Result<Response<ResponseBody<Empty<Bytes>>>, Infallible>>;
240
241 fn call(&self, _: Request<ReqBody>) -> Self::Future {
242 future::ready(Ok(Response::builder()
243 .status(404)
244 .body(ResponseBody::empty_response())
245 .unwrap()))
246 }
247}