engineioxide/service/
mod.rs

1//! ## A tower [`Service`](tower_service::Service) for engine.io so it can be used with frameworks supporting tower services
2//!
3//!
4//! #### Example with a `hyper` standalone service :
5//!
6//! ```rust
7//! # use bytes::Bytes;
8//! # use engineioxide::layer::EngineIoLayer;
9//! # use engineioxide::handler::EngineIoHandler;
10//! # use engineioxide::service::EngineIoService;
11//! # use engineioxide::{Socket, DisconnectReason, Str};
12//! # use std::sync::Arc;
13//! #[derive(Debug)]
14//! struct MyHandler;
15//!
16//! impl EngineIoHandler for MyHandler {
17//!     type Data = ();
18//!     fn on_connect(self: Arc<Self>, socket: Arc<Socket<()>>) { }
19//!     fn on_disconnect(&self, socket: Arc<Socket<()>>, reason: DisconnectReason) { }
20//!     fn on_message(self: &Arc<Self>, msg: Str, socket: Arc<Socket<()>>) { }
21//!     fn on_binary(self: &Arc<Self>, data: Bytes, socket: Arc<Socket<()>>) { }
22//! }
23//!
24//! // Create a new engine.io service that will return a 404 not found response for other requests
25//! let service = EngineIoService::new(Arc::new(MyHandler))
26//!     .into_make_service(); // Create a MakeService from the EngineIoService to give it to hyper
27//! ```
28
29use std::{
30    convert::Infallible,
31    sync::Arc,
32    task::{Context, Poll},
33};
34
35use bytes::Bytes;
36use futures_util::future::{self, Ready};
37use http::{Request, Response};
38use http_body::Body;
39use http_body_util::Empty;
40use hyper::service::Service as HyperSvc;
41use tower_service::Service as TowerSvc;
42
43use crate::{
44    body::ResponseBody, config::EngineIoConfig, engine::EngineIo, handler::EngineIoHandler,
45};
46
47mod futures;
48mod parser;
49
50pub use self::parser::{ProtocolVersion, TransportType};
51use self::{futures::ResponseFuture, parser::dispatch_req};
52
53/// A `Service` that handles engine.io requests as a middleware.
54/// If the request is not an engine.io request, it forwards it to the inner service.
55/// If it is an engine.io request it will forward it to the appropriate `transport`.
56///
57/// By default, it uses a [`NotFoundService`] as the inner service so it can be used as a standalone [`Service`](TowerSvc).
58pub struct EngineIoService<H: EngineIoHandler, S = NotFoundService> {
59    inner: S,
60    engine: Arc<EngineIo<H>>,
61}
62
63impl<H: EngineIoHandler> EngineIoService<H, NotFoundService> {
64    /// Create a new [`EngineIoService`] with a [`NotFoundService`] as the inner service.
65    /// If the request is not an `EngineIo` request, it will always return a 404 response.
66    pub fn new(handler: Arc<H>) -> Self {
67        EngineIoService::with_config(handler, EngineIoConfig::default())
68    }
69    /// Create a new [`EngineIoService`] with a custom config
70    pub fn with_config(handler: Arc<H>, config: EngineIoConfig) -> Self {
71        EngineIoService::with_config_inner(NotFoundService, handler, config)
72    }
73}
74
75impl<S: Clone, H: EngineIoHandler> EngineIoService<H, S> {
76    /// Create a new [`EngineIoService`] with a custom inner service.
77    pub fn with_inner(inner: S, handler: Arc<H>) -> Self {
78        EngineIoService::with_config_inner(inner, handler, EngineIoConfig::default())
79    }
80
81    /// Create a new [`EngineIoService`] with a custom inner service and a custom config.
82    pub fn with_config_inner(inner: S, handler: Arc<H>, config: EngineIoConfig) -> Self {
83        EngineIoService {
84            inner,
85            engine: Arc::new(EngineIo::new(handler, config)),
86        }
87    }
88
89    /// Convert this [`EngineIoService`] into a [`MakeEngineIoService`].
90    /// This is useful when using [`EngineIoService`] without layers.
91    pub fn into_make_service(self) -> MakeEngineIoService<H, S> {
92        MakeEngineIoService::new(self)
93    }
94}
95
96impl<S: Clone, H: EngineIoHandler> Clone for EngineIoService<H, S> {
97    fn clone(&self) -> Self {
98        EngineIoService {
99            inner: self.inner.clone(),
100            engine: self.engine.clone(),
101        }
102    }
103}
104impl<H: EngineIoHandler, S> std::fmt::Debug for EngineIoService<H, S> {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        f.debug_struct("EngineIoService").finish()
107    }
108}
109
110/// Tower Service implementation.
111impl<H, ReqBody, ResBody, S> TowerSvc<Request<ReqBody>> for EngineIoService<H, S>
112where
113    H: EngineIoHandler,
114    ReqBody: Body + Send + Unpin + 'static + std::fmt::Debug,
115    ReqBody::Error: std::fmt::Debug,
116    ReqBody::Data: Send,
117    ResBody: Body + Send + 'static,
118    S: TowerSvc<Request<ReqBody>, Response = Response<ResBody>>,
119{
120    type Response = Response<ResponseBody<ResBody>>;
121    type Error = S::Error;
122    type Future = ResponseFuture<S::Future, ResBody>;
123
124    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
125        self.inner.poll_ready(cx)
126    }
127
128    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
129        let path = self.engine.config.req_path.as_ref();
130        if req.uri().path().starts_with(path) {
131            dispatch_req(req, self.engine.clone())
132        } else {
133            ResponseFuture::new(self.inner.call(req))
134        }
135    }
136}
137
138/// Hyper 1.0 Service implementation.
139impl<H, ReqBody, ResBody, S> HyperSvc<Request<ReqBody>> for EngineIoService<H, S>
140where
141    H: EngineIoHandler,
142    ReqBody: Body + Send + Unpin + 'static + std::fmt::Debug,
143    ReqBody::Error: std::fmt::Debug,
144    ReqBody::Data: Send,
145    ResBody: Body + Send + 'static,
146    S: HyperSvc<Request<ReqBody>, Response = Response<ResBody>>,
147{
148    type Response = Response<ResponseBody<ResBody>>;
149    type Error = S::Error;
150    type Future = ResponseFuture<S::Future, ResBody>;
151
152    fn call(&self, req: Request<ReqBody>) -> Self::Future {
153        let path = self.engine.config.req_path.as_ref();
154        if req.uri().path().starts_with(path) {
155            dispatch_req(req, self.engine.clone())
156        } else {
157            ResponseFuture::new(self.inner.call(req))
158        }
159    }
160}
161
162#[cfg(feature = "__test_harness")]
163#[doc(hidden)]
164impl<H, Svc> EngineIoService<H, Svc>
165where
166    H: EngineIoHandler,
167{
168    /// Create a new engine.io conn over websocket through a raw stream.
169    /// Mostly used for testing.
170    pub fn ws_init<S>(
171        &self,
172        conn: S,
173        protocol: ProtocolVersion,
174        sid: Option<engineioxide_core::Sid>,
175        req_data: http::request::Parts,
176    ) -> impl std::future::Future<Output = Result<(), crate::errors::Error>> + 'static
177    where
178        S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
179    {
180        let engine = self.engine.clone();
181        crate::transport::ws::on_init(engine, conn, protocol, sid, req_data)
182    }
183}
184
185/// A MakeService that always returns a clone of the [`EngineIoService`] it was created with.
186pub struct MakeEngineIoService<H: EngineIoHandler, S> {
187    svc: EngineIoService<H, S>,
188}
189
190impl<H: EngineIoHandler, S> MakeEngineIoService<H, S> {
191    /// Create a new [`MakeEngineIoService`] with a custom inner service.
192    pub fn new(svc: EngineIoService<H, S>) -> Self {
193        MakeEngineIoService { svc }
194    }
195}
196
197impl<H: EngineIoHandler, S: Clone, T> TowerSvc<T> for MakeEngineIoService<H, S> {
198    type Response = EngineIoService<H, S>;
199
200    type Error = Infallible;
201
202    type Future = Ready<Result<Self::Response, Self::Error>>;
203
204    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
205        Poll::Ready(Ok(()))
206    }
207
208    fn call(&mut self, _req: T) -> Self::Future {
209        future::ready(Ok(self.svc.clone()))
210    }
211}
212
213/// A `Service` that always returns a 404 response and that is compatible with [`EngineIoService`]
214#[derive(Debug, Clone)]
215pub struct NotFoundService;
216
217/// Implement a custom tower [`Service`](TowerSvc) for the [`NotFoundService`]
218impl<ReqBody> TowerSvc<Request<ReqBody>> for NotFoundService {
219    type Response = Response<ResponseBody<Empty<Bytes>>>;
220    type Error = Infallible;
221    type Future = Ready<Result<Response<ResponseBody<Empty<Bytes>>>, Infallible>>;
222
223    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
224        Poll::Ready(Ok(()))
225    }
226
227    fn call(&mut self, _: Request<ReqBody>) -> Self::Future {
228        future::ready(Ok(Response::builder()
229            .status(404)
230            .body(ResponseBody::empty_response())
231            .unwrap()))
232    }
233}
234
235/// Implement a custom hyper [`Service`](HyperSvc) for the [`NotFoundService`]
236impl<ReqBody> HyperSvc<Request<ReqBody>> for NotFoundService {
237    type Response = Response<ResponseBody<Empty<Bytes>>>;
238    type Error = Infallible;
239    type Future = Ready<Result<Response<ResponseBody<Empty<Bytes>>>, Infallible>>;
240
241    fn call(&self, _: Request<ReqBody>) -> Self::Future {
242        future::ready(Ok(Response::builder()
243            .status(404)
244            .body(ResponseBody::empty_response())
245            .unwrap()))
246    }
247}