Skip to main content

ntex/http/h1/
control.rs

1use std::{fmt, future::Future, io, rc::Rc};
2
3use crate::http::message::CurrentIo;
4use crate::http::{Request, Response, ResponseError, body::Body, h1::Codec};
5use crate::io::{Filter, Io, IoBoxed, IoRef};
6
7pub enum Control<F, Err> {
8    /// New connection
9    Connect(Connection<F>),
10    /// New request is loaded
11    Request(NewRequest),
12    /// Handle `Connection: UPGRADE`
13    Upgrade(Upgrade<F>),
14    /// Handle `EXPECT` header
15    Expect(Expect),
16    /// Connection is prepared to disconnect
17    Disconnect(Reason<Err>),
18}
19
20#[derive(Debug)]
21/// Disconnect reason
22pub enum Reason<Err> {
23    /// Disconnect initiated by service
24    Service(ServiceDisconnect),
25    /// Application level error
26    Error(Error<Err>),
27    /// Protocol level error
28    ProtocolError(ProtocolError),
29    /// Peer is gone
30    PeerGone(PeerGone),
31    /// Keep-alive timeout
32    KeepAlive(KeepAlive),
33}
34
35#[derive(Copy, Clone, Debug, PartialEq, Eq)]
36pub enum ServiceDisconnectReason {
37    /// Server is shutting down
38    Shutdown,
39    /// Upgrade request is handled by Upgrade service
40    UpgradeHandled,
41    /// Upgrade handling failed
42    UpgradeFailed,
43    /// Expect control message handling failed
44    ExpectFailed,
45    /// Service is not interested in payload, it is not possible to continue
46    PayloadDropped,
47}
48
49/// Control message handling result
50#[derive(Debug)]
51pub struct ControlAck<F> {
52    pub(super) result: ControlResult<F>,
53}
54
55#[derive(Debug)]
56pub(super) enum ControlResult<F> {
57    /// Continue
58    Connect(Io<F>),
59    /// Continue
60    Continue(Request),
61    /// handle request expect
62    Expect(Request),
63    /// handle request upgrade
64    Upgrade(Request),
65    /// upgrade acked
66    UpgradeAck(Request),
67    /// upgrade handled
68    UpgradeHandled,
69    /// forward request to publish service
70    Publish(Request),
71    /// send response
72    Response(Response<()>, Body),
73    /// service error
74    Error(Response<()>, Body),
75    /// protocol error
76    ProtocolError(Response<()>, Body),
77    /// upgrade handling failed
78    UpgradeFailed(Response<()>, Body),
79    /// expect handling failed
80    ExpectFailed(Response<()>, Body),
81    /// stop connection
82    Stop,
83}
84
85impl<F, Err> Control<F, Err> {
86    pub(super) fn connect(id: usize, io: Io<F>) -> Self {
87        Control::Connect(Connection { id, io })
88    }
89
90    pub(super) fn request(req: Request) -> Self {
91        Control::Request(NewRequest(req))
92    }
93
94    pub(super) fn upgrade(req: Request, io: Rc<Io<F>>, codec: Codec) -> Self {
95        Control::Upgrade(Upgrade { req, io, codec })
96    }
97
98    pub(super) fn expect(req: Request) -> Self {
99        Control::Expect(Expect(req))
100    }
101
102    pub(super) fn err(err: Err) -> Self
103    where
104        Err: ResponseError,
105    {
106        Control::Disconnect(Reason::Error(Error::new(err)))
107    }
108
109    pub(super) fn peer_gone(err: Option<io::Error>) -> Self {
110        Control::Disconnect(Reason::PeerGone(PeerGone(err)))
111    }
112
113    pub(super) fn proto_err(err: super::ProtocolError) -> Self {
114        Control::Disconnect(Reason::ProtocolError(ProtocolError(err)))
115    }
116
117    pub(super) fn keepalive(enabled: bool) -> Self {
118        Control::Disconnect(Reason::KeepAlive(KeepAlive::new(enabled)))
119    }
120
121    pub(super) fn svc_disconnect(reason: ServiceDisconnectReason) -> Self {
122        Control::Disconnect(Reason::Service(ServiceDisconnect::new(reason)))
123    }
124
125    #[inline]
126    /// Ack control message
127    pub fn ack(self) -> ControlAck<F>
128    where
129        F: Filter,
130        Err: ResponseError,
131    {
132        match self {
133            Control::Connect(msg) => msg.ack(),
134            Control::Request(msg) => msg.ack(),
135            Control::Upgrade(msg) => msg.ack(),
136            Control::Expect(msg) => msg.ack(),
137            Control::Disconnect(msg) => msg.ack(),
138        }
139    }
140}
141
142impl<F, Err> fmt::Debug for Control<F, Err>
143where
144    Err: fmt::Debug,
145{
146    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147        match self {
148            Control::Connect(_) => f.debug_tuple("Control::Connect").finish(),
149            Control::Request(msg) => f.debug_tuple("Control::Request").field(msg).finish(),
150            Control::Upgrade(msg) => f.debug_tuple("Control::Upgrade").field(msg).finish(),
151            Control::Expect(msg) => f.debug_tuple("Control::Expect").field(msg).finish(),
152            Control::Disconnect(msg) => {
153                f.debug_tuple("Control::Disconnect").field(msg).finish()
154            }
155        }
156    }
157}
158
159impl<Err: ResponseError> Reason<Err> {
160    pub fn ack<F>(self) -> ControlAck<F> {
161        match self {
162            Reason::Error(msg) => msg.ack(),
163            Reason::ProtocolError(msg) => msg.ack(),
164            Reason::PeerGone(msg) => msg.ack(),
165            Reason::KeepAlive(msg) => msg.ack(),
166            Reason::Service(msg) => msg.ack(),
167        }
168    }
169}
170
171#[derive(Debug)]
172pub struct Connection<F> {
173    id: usize,
174    io: Io<F>,
175}
176
177impl<F> Connection<F> {
178    #[inline]
179    pub fn id(self) -> usize {
180        self.id
181    }
182
183    #[inline]
184    /// Returns reference to Io
185    pub fn get_ref(&self) -> &Io<F> {
186        &self.io
187    }
188
189    #[inline]
190    /// Returns mut reference to Io
191    pub fn get_mut(&mut self) -> &mut Io<F> {
192        &mut self.io
193    }
194
195    #[inline]
196    /// Ack new request and continue handling process
197    pub fn ack(self) -> ControlAck<F> {
198        ControlAck {
199            result: ControlResult::Connect(self.io),
200        }
201    }
202}
203
204#[derive(Debug)]
205pub struct NewRequest(Request);
206
207impl NewRequest {
208    #[inline]
209    /// Returns reference to http request
210    pub fn get_ref(&self) -> &Request {
211        &self.0
212    }
213
214    #[inline]
215    /// Returns mut reference to http request
216    pub fn get_mut(&mut self) -> &mut Request {
217        &mut self.0
218    }
219
220    #[inline]
221    /// Ack new request and continue handling process
222    pub fn ack<F>(self) -> ControlAck<F> {
223        let result = if self.0.head().expect() {
224            ControlResult::Expect(self.0)
225        } else if self.0.upgrade() {
226            ControlResult::Upgrade(self.0)
227        } else {
228            ControlResult::Publish(self.0)
229        };
230        ControlAck { result }
231    }
232
233    #[inline]
234    /// Fail request handling
235    pub fn fail<E: ResponseError, F>(self, err: E) -> ControlAck<F> {
236        let res: Response = (&err).into();
237        let (res, body) = res.into_parts();
238
239        ControlAck {
240            result: ControlResult::Response(res, body.into()),
241        }
242    }
243
244    #[inline]
245    /// Fail request and send custom response
246    pub fn fail_with<F>(self, res: Response) -> ControlAck<F> {
247        let (res, body) = res.into_parts();
248
249        ControlAck {
250            result: ControlResult::Response(res, body.into()),
251        }
252    }
253}
254
255pub struct Upgrade<F> {
256    req: Request,
257    io: Rc<Io<F>>,
258    codec: Codec,
259}
260
261struct RequestIoAccess<F> {
262    io: Rc<Io<F>>,
263    codec: Codec,
264}
265
266impl<F> fmt::Debug for RequestIoAccess<F> {
267    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268        f.debug_struct("RequestIoAccess")
269            .field("io", self.io.as_ref())
270            .field("codec", &self.codec)
271            .finish()
272    }
273}
274
275impl<F: Filter> crate::http::message::IoAccess for RequestIoAccess<F> {
276    fn get(&self) -> Option<&IoRef> {
277        Some(self.io.as_ref())
278    }
279
280    fn take(&self) -> Option<(IoBoxed, Codec)> {
281        Some((self.io.take().into(), self.codec.clone()))
282    }
283}
284
285impl<F: Filter> Upgrade<F> {
286    #[inline]
287    /// Returns reference to Io
288    pub fn io(&self) -> &Io<F> {
289        &self.io
290    }
291
292    #[inline]
293    /// Returns reference to http request
294    pub fn get_ref(&self) -> &Request {
295        &self.req
296    }
297
298    #[inline]
299    /// Returns mut reference to http request
300    pub fn get_mut(&mut self) -> &mut Request {
301        &mut self.req
302    }
303
304    #[inline]
305    /// Ack upgrade request and continue handling process
306    pub fn ack(mut self) -> ControlAck<F> {
307        // Move io into request
308        let io = Rc::new(RequestIoAccess {
309            io: self.io,
310            codec: self.codec,
311        });
312        self.req.head_mut().io = CurrentIo::new(io);
313
314        ControlAck {
315            result: ControlResult::UpgradeAck(self.req),
316        }
317    }
318
319    #[inline]
320    /// Handle upgrade request
321    pub fn handle<H, R, O>(self, f: H) -> ControlAck<F>
322    where
323        H: FnOnce(Request, Io<F>, Codec) -> R + 'static,
324        R: Future<Output = O>,
325    {
326        let io = self.io.take();
327        let _ = crate::rt::spawn(async move {
328            let _ = f(self.req, io, self.codec).await;
329        });
330        ControlAck {
331            result: ControlResult::UpgradeHandled,
332        }
333    }
334
335    #[inline]
336    /// Fail request handling
337    pub fn fail<E: ResponseError>(self, err: E) -> ControlAck<F> {
338        let res: Response = (&err).into();
339        let (res, body) = res.into_parts();
340
341        ControlAck {
342            result: ControlResult::UpgradeFailed(res, body.into()),
343        }
344    }
345
346    #[inline]
347    /// Fail request and send custom response
348    pub fn fail_with(self, res: Response) -> ControlAck<F> {
349        let (res, body) = res.into_parts();
350
351        ControlAck {
352            result: ControlResult::UpgradeFailed(res, body.into()),
353        }
354    }
355}
356
357impl<F> fmt::Debug for Upgrade<F> {
358    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
359        f.debug_struct("Upgrade")
360            .field("req", &self.req)
361            .field("io", &self.io)
362            .field("codec", &self.codec)
363            .finish()
364    }
365}
366
367/// Service disconnect initiated by server
368#[derive(Debug)]
369pub struct ServiceDisconnect(ServiceDisconnectReason);
370
371impl ServiceDisconnect {
372    fn new(reason: ServiceDisconnectReason) -> Self {
373        Self(reason)
374    }
375
376    #[inline]
377    /// Service disconnect reason
378    pub fn reason(&self) -> ServiceDisconnectReason {
379        self.0
380    }
381
382    #[inline]
383    /// Ack controk message
384    pub fn ack<F>(self) -> ControlAck<F> {
385        ControlAck {
386            result: ControlResult::Stop,
387        }
388    }
389}
390
391/// `KeepAlive` control message
392#[derive(Debug)]
393pub struct KeepAlive {
394    enabled: bool,
395}
396
397impl KeepAlive {
398    pub(super) fn new(enabled: bool) -> Self {
399        Self { enabled }
400    }
401
402    #[inline]
403    /// Connection keep-alive is enabled
404    pub fn is_enabled(&self) -> bool {
405        self.enabled
406    }
407
408    #[inline]
409    /// Ack controk message
410    pub fn ack<F>(self) -> ControlAck<F> {
411        ControlAck {
412            result: ControlResult::Stop,
413        }
414    }
415}
416
417/// Service level error
418#[derive(Debug)]
419pub struct Error<Err> {
420    err: Err,
421    pkt: Response,
422}
423
424impl<Err: ResponseError> Error<Err> {
425    fn new(err: Err) -> Self {
426        Self {
427            pkt: err.error_response(),
428            err,
429        }
430    }
431
432    #[inline]
433    /// Returns reference to http error
434    pub fn get_ref(&self) -> &Err {
435        &self.err
436    }
437
438    #[inline]
439    /// Ack service error and close connection.
440    pub fn ack<F>(self) -> ControlAck<F> {
441        let (res, body) = self.pkt.into_parts();
442        ControlAck {
443            result: ControlResult::Error(res, body.into()),
444        }
445    }
446
447    #[inline]
448    /// Fail error handling
449    pub fn fail<E: ResponseError, F>(self, err: E) -> ControlAck<F> {
450        let res: Response = (&err).into();
451        let (res, body) = res.into_parts();
452
453        ControlAck {
454            result: ControlResult::Error(res, body.into()),
455        }
456    }
457
458    #[inline]
459    /// Fail error handling
460    pub fn fail_with<F>(self, res: Response) -> ControlAck<F> {
461        let (res, body) = res.into_parts();
462
463        ControlAck {
464            result: ControlResult::Error(res, body.into()),
465        }
466    }
467}
468
469#[derive(Debug)]
470pub struct ProtocolError(super::ProtocolError);
471
472impl ProtocolError {
473    #[inline]
474    /// Returns error reference
475    pub fn err(&self) -> &super::ProtocolError {
476        &self.0
477    }
478
479    #[inline]
480    /// Ack `ProtocolError` message
481    pub fn ack<F>(self) -> ControlAck<F> {
482        let (res, body) = self.0.error_response().into_parts();
483
484        ControlAck {
485            result: ControlResult::ProtocolError(res, body.into()),
486        }
487    }
488
489    #[inline]
490    /// Fail error handling
491    pub fn fail<E: ResponseError, F>(self, err: E) -> ControlAck<F> {
492        let res: Response = (&err).into();
493        let (res, body) = res.into_parts();
494
495        ControlAck {
496            result: ControlResult::ProtocolError(res, body.into()),
497        }
498    }
499
500    #[inline]
501    /// Fail error handling
502    pub fn fail_with<F>(self, res: Response) -> ControlAck<F> {
503        let (res, body) = res.into_parts();
504
505        ControlAck {
506            result: ControlResult::ProtocolError(res, body.into()),
507        }
508    }
509}
510
511#[derive(Debug)]
512pub struct PeerGone(Option<io::Error>);
513
514impl PeerGone {
515    #[inline]
516    /// Returns error reference
517    pub fn err(&self) -> Option<&io::Error> {
518        self.0.as_ref()
519    }
520
521    #[inline]
522    /// Take error
523    pub fn take(&mut self) -> Option<io::Error> {
524        self.0.take()
525    }
526
527    #[inline]
528    /// Ack `PeerGone` message
529    pub fn ack<F>(self) -> ControlAck<F> {
530        ControlAck {
531            result: ControlResult::Stop,
532        }
533    }
534}
535
536#[derive(Debug)]
537pub struct Expect(Request);
538
539impl Expect {
540    #[inline]
541    /// Returns reference to http request
542    pub fn get_ref(&self) -> &Request {
543        &self.0
544    }
545
546    #[inline]
547    /// Ack expect request
548    pub fn ack<F>(self) -> ControlAck<F> {
549        ControlAck {
550            result: ControlResult::Continue(self.0),
551        }
552    }
553
554    #[inline]
555    /// Fail expect request
556    pub fn fail<E: ResponseError, F>(self, err: E) -> ControlAck<F> {
557        let res: Response = (&err).into();
558        let (res, body) = res.into_parts();
559
560        ControlAck {
561            result: ControlResult::ExpectFailed(res, body.into()),
562        }
563    }
564
565    #[inline]
566    /// Fail expect request and send custom response
567    pub fn fail_with<F>(self, res: Response) -> ControlAck<F> {
568        let (res, body) = res.into_parts();
569
570        ControlAck {
571            result: ControlResult::ExpectFailed(res, body.into()),
572        }
573    }
574}