1use std::{fmt, future::Future, io, rc::Rc};
2
3use crate::http::message::CurrentIo;
4use crate::http::{Request, Response, ResponseError, body::Body, h1::Codec};
5use crate::io::{Filter, Io, IoBoxed, IoRef};
6
7pub enum Control<F, Err> {
8 Connect(Connection<F>),
10 Request(NewRequest),
12 Upgrade(Upgrade<F>),
14 Expect(Expect),
16 Disconnect(Reason<Err>),
18}
19
20#[derive(Debug)]
21pub enum Reason<Err> {
23 Service(ServiceDisconnect),
25 Error(Error<Err>),
27 ProtocolError(ProtocolError),
29 PeerGone(PeerGone),
31 KeepAlive(KeepAlive),
33}
34
35#[derive(Copy, Clone, Debug, PartialEq, Eq)]
36pub enum ServiceDisconnectReason {
37 Shutdown,
39 UpgradeHandled,
41 UpgradeFailed,
43 ExpectFailed,
45 PayloadDropped,
47}
48
49#[derive(Debug)]
51pub struct ControlAck<F> {
52 pub(super) result: ControlResult<F>,
53}
54
55#[derive(Debug)]
56pub(super) enum ControlResult<F> {
57 Connect(Io<F>),
59 Continue(Request),
61 Expect(Request),
63 Upgrade(Request),
65 UpgradeAck(Request),
67 UpgradeHandled,
69 Publish(Request),
71 Response(Response<()>, Body),
73 Error(Response<()>, Body),
75 ProtocolError(Response<()>, Body),
77 UpgradeFailed(Response<()>, Body),
79 ExpectFailed(Response<()>, Body),
81 Stop,
83}
84
85impl<F, Err> Control<F, Err> {
86 pub(super) fn connect(id: usize, io: Io<F>) -> Self {
87 Control::Connect(Connection { id, io })
88 }
89
90 pub(super) fn request(req: Request) -> Self {
91 Control::Request(NewRequest(req))
92 }
93
94 pub(super) fn upgrade(req: Request, io: Rc<Io<F>>, codec: Codec) -> Self {
95 Control::Upgrade(Upgrade { req, io, codec })
96 }
97
98 pub(super) fn expect(req: Request) -> Self {
99 Control::Expect(Expect(req))
100 }
101
102 pub(super) fn err(err: Err) -> Self
103 where
104 Err: ResponseError,
105 {
106 Control::Disconnect(Reason::Error(Error::new(err)))
107 }
108
109 pub(super) fn peer_gone(err: Option<io::Error>) -> Self {
110 Control::Disconnect(Reason::PeerGone(PeerGone(err)))
111 }
112
113 pub(super) fn proto_err(err: super::ProtocolError) -> Self {
114 Control::Disconnect(Reason::ProtocolError(ProtocolError(err)))
115 }
116
117 pub(super) fn keepalive(enabled: bool) -> Self {
118 Control::Disconnect(Reason::KeepAlive(KeepAlive::new(enabled)))
119 }
120
121 pub(super) fn svc_disconnect(reason: ServiceDisconnectReason) -> Self {
122 Control::Disconnect(Reason::Service(ServiceDisconnect::new(reason)))
123 }
124
125 #[inline]
126 pub fn ack(self) -> ControlAck<F>
128 where
129 F: Filter,
130 Err: ResponseError,
131 {
132 match self {
133 Control::Connect(msg) => msg.ack(),
134 Control::Request(msg) => msg.ack(),
135 Control::Upgrade(msg) => msg.ack(),
136 Control::Expect(msg) => msg.ack(),
137 Control::Disconnect(msg) => msg.ack(),
138 }
139 }
140}
141
142impl<F, Err> fmt::Debug for Control<F, Err>
143where
144 Err: fmt::Debug,
145{
146 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
147 match self {
148 Control::Connect(_) => f.debug_tuple("Control::Connect").finish(),
149 Control::Request(msg) => f.debug_tuple("Control::Request").field(msg).finish(),
150 Control::Upgrade(msg) => f.debug_tuple("Control::Upgrade").field(msg).finish(),
151 Control::Expect(msg) => f.debug_tuple("Control::Expect").field(msg).finish(),
152 Control::Disconnect(msg) => {
153 f.debug_tuple("Control::Disconnect").field(msg).finish()
154 }
155 }
156 }
157}
158
159impl<Err: ResponseError> Reason<Err> {
160 pub fn ack<F>(self) -> ControlAck<F> {
161 match self {
162 Reason::Error(msg) => msg.ack(),
163 Reason::ProtocolError(msg) => msg.ack(),
164 Reason::PeerGone(msg) => msg.ack(),
165 Reason::KeepAlive(msg) => msg.ack(),
166 Reason::Service(msg) => msg.ack(),
167 }
168 }
169}
170
171#[derive(Debug)]
172pub struct Connection<F> {
173 id: usize,
174 io: Io<F>,
175}
176
177impl<F> Connection<F> {
178 #[inline]
179 pub fn id(self) -> usize {
180 self.id
181 }
182
183 #[inline]
184 pub fn get_ref(&self) -> &Io<F> {
186 &self.io
187 }
188
189 #[inline]
190 pub fn get_mut(&mut self) -> &mut Io<F> {
192 &mut self.io
193 }
194
195 #[inline]
196 pub fn ack(self) -> ControlAck<F> {
198 ControlAck {
199 result: ControlResult::Connect(self.io),
200 }
201 }
202}
203
204#[derive(Debug)]
205pub struct NewRequest(Request);
206
207impl NewRequest {
208 #[inline]
209 pub fn get_ref(&self) -> &Request {
211 &self.0
212 }
213
214 #[inline]
215 pub fn get_mut(&mut self) -> &mut Request {
217 &mut self.0
218 }
219
220 #[inline]
221 pub fn ack<F>(self) -> ControlAck<F> {
223 let result = if self.0.head().expect() {
224 ControlResult::Expect(self.0)
225 } else if self.0.upgrade() {
226 ControlResult::Upgrade(self.0)
227 } else {
228 ControlResult::Publish(self.0)
229 };
230 ControlAck { result }
231 }
232
233 #[inline]
234 pub fn fail<E: ResponseError, F>(self, err: E) -> ControlAck<F> {
236 let res: Response = (&err).into();
237 let (res, body) = res.into_parts();
238
239 ControlAck {
240 result: ControlResult::Response(res, body.into()),
241 }
242 }
243
244 #[inline]
245 pub fn fail_with<F>(self, res: Response) -> ControlAck<F> {
247 let (res, body) = res.into_parts();
248
249 ControlAck {
250 result: ControlResult::Response(res, body.into()),
251 }
252 }
253}
254
255pub struct Upgrade<F> {
256 req: Request,
257 io: Rc<Io<F>>,
258 codec: Codec,
259}
260
261struct RequestIoAccess<F> {
262 io: Rc<Io<F>>,
263 codec: Codec,
264}
265
266impl<F> fmt::Debug for RequestIoAccess<F> {
267 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268 f.debug_struct("RequestIoAccess")
269 .field("io", self.io.as_ref())
270 .field("codec", &self.codec)
271 .finish()
272 }
273}
274
275impl<F: Filter> crate::http::message::IoAccess for RequestIoAccess<F> {
276 fn get(&self) -> Option<&IoRef> {
277 Some(self.io.as_ref())
278 }
279
280 fn take(&self) -> Option<(IoBoxed, Codec)> {
281 Some((self.io.take().into(), self.codec.clone()))
282 }
283}
284
285impl<F: Filter> Upgrade<F> {
286 #[inline]
287 pub fn io(&self) -> &Io<F> {
289 &self.io
290 }
291
292 #[inline]
293 pub fn get_ref(&self) -> &Request {
295 &self.req
296 }
297
298 #[inline]
299 pub fn get_mut(&mut self) -> &mut Request {
301 &mut self.req
302 }
303
304 #[inline]
305 pub fn ack(mut self) -> ControlAck<F> {
307 let io = Rc::new(RequestIoAccess {
309 io: self.io,
310 codec: self.codec,
311 });
312 self.req.head_mut().io = CurrentIo::new(io);
313
314 ControlAck {
315 result: ControlResult::UpgradeAck(self.req),
316 }
317 }
318
319 #[inline]
320 pub fn handle<H, R, O>(self, f: H) -> ControlAck<F>
322 where
323 H: FnOnce(Request, Io<F>, Codec) -> R + 'static,
324 R: Future<Output = O>,
325 {
326 let io = self.io.take();
327 let _ = crate::rt::spawn(async move {
328 let _ = f(self.req, io, self.codec).await;
329 });
330 ControlAck {
331 result: ControlResult::UpgradeHandled,
332 }
333 }
334
335 #[inline]
336 pub fn fail<E: ResponseError>(self, err: E) -> ControlAck<F> {
338 let res: Response = (&err).into();
339 let (res, body) = res.into_parts();
340
341 ControlAck {
342 result: ControlResult::UpgradeFailed(res, body.into()),
343 }
344 }
345
346 #[inline]
347 pub fn fail_with(self, res: Response) -> ControlAck<F> {
349 let (res, body) = res.into_parts();
350
351 ControlAck {
352 result: ControlResult::UpgradeFailed(res, body.into()),
353 }
354 }
355}
356
357impl<F> fmt::Debug for Upgrade<F> {
358 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
359 f.debug_struct("Upgrade")
360 .field("req", &self.req)
361 .field("io", &self.io)
362 .field("codec", &self.codec)
363 .finish()
364 }
365}
366
367#[derive(Debug)]
369pub struct ServiceDisconnect(ServiceDisconnectReason);
370
371impl ServiceDisconnect {
372 fn new(reason: ServiceDisconnectReason) -> Self {
373 Self(reason)
374 }
375
376 #[inline]
377 pub fn reason(&self) -> ServiceDisconnectReason {
379 self.0
380 }
381
382 #[inline]
383 pub fn ack<F>(self) -> ControlAck<F> {
385 ControlAck {
386 result: ControlResult::Stop,
387 }
388 }
389}
390
391#[derive(Debug)]
393pub struct KeepAlive {
394 enabled: bool,
395}
396
397impl KeepAlive {
398 pub(super) fn new(enabled: bool) -> Self {
399 Self { enabled }
400 }
401
402 #[inline]
403 pub fn is_enabled(&self) -> bool {
405 self.enabled
406 }
407
408 #[inline]
409 pub fn ack<F>(self) -> ControlAck<F> {
411 ControlAck {
412 result: ControlResult::Stop,
413 }
414 }
415}
416
417#[derive(Debug)]
419pub struct Error<Err> {
420 err: Err,
421 pkt: Response,
422}
423
424impl<Err: ResponseError> Error<Err> {
425 fn new(err: Err) -> Self {
426 Self {
427 pkt: err.error_response(),
428 err,
429 }
430 }
431
432 #[inline]
433 pub fn get_ref(&self) -> &Err {
435 &self.err
436 }
437
438 #[inline]
439 pub fn ack<F>(self) -> ControlAck<F> {
441 let (res, body) = self.pkt.into_parts();
442 ControlAck {
443 result: ControlResult::Error(res, body.into()),
444 }
445 }
446
447 #[inline]
448 pub fn fail<E: ResponseError, F>(self, err: E) -> ControlAck<F> {
450 let res: Response = (&err).into();
451 let (res, body) = res.into_parts();
452
453 ControlAck {
454 result: ControlResult::Error(res, body.into()),
455 }
456 }
457
458 #[inline]
459 pub fn fail_with<F>(self, res: Response) -> ControlAck<F> {
461 let (res, body) = res.into_parts();
462
463 ControlAck {
464 result: ControlResult::Error(res, body.into()),
465 }
466 }
467}
468
469#[derive(Debug)]
470pub struct ProtocolError(super::ProtocolError);
471
472impl ProtocolError {
473 #[inline]
474 pub fn err(&self) -> &super::ProtocolError {
476 &self.0
477 }
478
479 #[inline]
480 pub fn ack<F>(self) -> ControlAck<F> {
482 let (res, body) = self.0.error_response().into_parts();
483
484 ControlAck {
485 result: ControlResult::ProtocolError(res, body.into()),
486 }
487 }
488
489 #[inline]
490 pub fn fail<E: ResponseError, F>(self, err: E) -> ControlAck<F> {
492 let res: Response = (&err).into();
493 let (res, body) = res.into_parts();
494
495 ControlAck {
496 result: ControlResult::ProtocolError(res, body.into()),
497 }
498 }
499
500 #[inline]
501 pub fn fail_with<F>(self, res: Response) -> ControlAck<F> {
503 let (res, body) = res.into_parts();
504
505 ControlAck {
506 result: ControlResult::ProtocolError(res, body.into()),
507 }
508 }
509}
510
511#[derive(Debug)]
512pub struct PeerGone(Option<io::Error>);
513
514impl PeerGone {
515 #[inline]
516 pub fn err(&self) -> Option<&io::Error> {
518 self.0.as_ref()
519 }
520
521 #[inline]
522 pub fn take(&mut self) -> Option<io::Error> {
524 self.0.take()
525 }
526
527 #[inline]
528 pub fn ack<F>(self) -> ControlAck<F> {
530 ControlAck {
531 result: ControlResult::Stop,
532 }
533 }
534}
535
536#[derive(Debug)]
537pub struct Expect(Request);
538
539impl Expect {
540 #[inline]
541 pub fn get_ref(&self) -> &Request {
543 &self.0
544 }
545
546 #[inline]
547 pub fn ack<F>(self) -> ControlAck<F> {
549 ControlAck {
550 result: ControlResult::Continue(self.0),
551 }
552 }
553
554 #[inline]
555 pub fn fail<E: ResponseError, F>(self, err: E) -> ControlAck<F> {
557 let res: Response = (&err).into();
558 let (res, body) = res.into_parts();
559
560 ControlAck {
561 result: ControlResult::ExpectFailed(res, body.into()),
562 }
563 }
564
565 #[inline]
566 pub fn fail_with<F>(self, res: Response) -> ControlAck<F> {
568 let (res, body) = res.into_parts();
569
570 ControlAck {
571 result: ControlResult::ExpectFailed(res, body.into()),
572 }
573 }
574}