1use std::{
8 cell::{Cell, RefCell},
9 convert::TryFrom,
10 marker::PhantomData,
11 ops::Deref,
12 rc::Rc,
13 task::{Poll, Waker},
14};
15
16use futures::{Stream, StreamExt};
17
18use crate::{
19 host::Host,
20 reactor::http::{HttpReactor, WakerId},
21 types::HttpCid,
22};
23use crate::{
24 http_constants::{
25 DEFAULT_PATH, HEADER_AUTHORITY, HEADER_METHOD, HEADER_PATH, HEADER_SCHEME, HEADER_STATUS,
26 },
27 reactor::http::ExchangePhase,
28};
29
30mod private {
31 use crate::host::Host;
32
33 pub trait Sealed {}
34
35 pub trait BodyAccessor {
36 fn read_body(host: &dyn Host, offset: usize, max_size: usize) -> Option<Vec<u8>>;
37
38 fn write_body(host: &dyn Host, offset: usize, size: usize, value: &[u8]);
39 }
40}
41
42use private::{BodyAccessor, Sealed};
43
44pub trait Event: Sealed + Clone + Into<FiniteEvent> + TryFrom<FiniteEvent> + Unpin {
46 fn kind() -> EventKind;
47
48 fn body_size(&self) -> usize;
49
50 fn should_pause(&self) -> bool;
51}
52
53pub trait After<S: Event>: Event {}
55
56pub trait Before<S: Event>: Event {}
58
59impl<A, B> Before<B> for A
60where
61 B: After<A>,
62 A: Event,
63{
64}
65
66pub trait BodyEvent: Event + BodyAccessor {
68 fn end_of_stream(&self) -> bool;
69}
70
71pub trait HeadersEvent: Event {}
73
74#[derive(Clone, Debug)]
76pub struct Start {
77 pub(crate) _context_id: HttpCid,
78}
79
80#[derive(Clone, Debug)]
81pub struct RequestHeaders {
83 pub(crate) _num_headers: usize,
84 pub(crate) end_of_stream: bool,
85}
86
87impl HeadersEvent for RequestHeaders {}
88
89#[derive(Clone, Debug)]
90pub struct RequestBody {
92 pub(crate) body_size: usize,
93 pub(crate) end_of_stream: bool,
94}
95
96impl BodyEvent for RequestBody {
97 fn end_of_stream(&self) -> bool {
98 self.end_of_stream
99 }
100}
101
102impl BodyAccessor for RequestBody {
103 fn read_body(host: &dyn Host, offset: usize, max_size: usize) -> Option<Vec<u8>> {
104 host.get_http_request_body(offset, max_size)
105 }
106
107 fn write_body(host: &dyn Host, offset: usize, size: usize, value: &[u8]) {
108 host.set_http_request_body(offset, size, value)
109 }
110}
111
112#[cfg(feature = "experimental_enable_stop_iteration")]
113impl HeadersAccessor for EventDataStream<'_, RequestBody> {
114 fn header(&self, name: &str) -> Option<String> {
115 self.exchange.host.get_http_request_header(name)
116 }
117
118 fn headers(&self) -> Vec<(String, String)> {
119 self.exchange.host.get_http_request_headers()
120 }
121
122 fn add_header(&self, name: &str, value: &str) {
123 self.exchange.host.add_http_request_header(name, value);
124 }
125
126 fn set_header(&self, name: &str, value: &str) {
127 self.exchange
128 .host
129 .set_http_request_header(name, Some(value));
130 }
131
132 fn set_headers(&self, headers: Vec<(&str, &str)>) {
133 self.exchange.host.set_http_request_headers(headers);
134 }
135
136 fn remove_header(&self, name: &str) {
137 self.exchange.host.set_http_request_header(name, None);
138 }
139}
140
141#[cfg(feature = "experimental_enable_stop_iteration")]
142impl HeadersAccessor for EventDataStream<'_, ResponseBody> {
143 fn header(&self, name: &str) -> Option<String> {
144 self.exchange.host.get_http_response_header(name)
145 }
146
147 fn headers(&self) -> Vec<(String, String)> {
148 self.exchange.host.get_http_response_headers()
149 }
150
151 fn add_header(&self, name: &str, value: &str) {
152 self.exchange.host.add_http_response_header(name, value);
153 }
154
155 fn set_header(&self, name: &str, value: &str) {
156 self.exchange
157 .host
158 .set_http_response_header(name, Some(value));
159 }
160
161 fn set_headers(&self, headers: Vec<(&str, &str)>) {
162 self.exchange.host.set_http_response_headers(headers);
163 }
164
165 fn remove_header(&self, name: &str) {
166 self.exchange.host.set_http_response_header(name, None);
167 }
168}
169
170#[derive(Clone, Debug)]
171pub struct RequestTrailers {
173 pub(crate) _num_trailers: usize,
174}
175
176impl HeadersEvent for RequestTrailers {}
177
178#[derive(Clone, Debug)]
179pub struct ResponseHeaders {
181 pub(crate) _num_headers: usize,
182 pub(crate) end_of_stream: bool,
183}
184
185impl HeadersEvent for ResponseHeaders {}
186
187#[derive(Clone, Debug)]
188pub struct ResponseBody {
190 pub(crate) body_size: usize,
191 pub(crate) end_of_stream: bool,
192}
193
194impl BodyEvent for ResponseBody {
195 fn end_of_stream(&self) -> bool {
196 self.end_of_stream
197 }
198}
199
200impl BodyAccessor for ResponseBody {
201 fn read_body(host: &dyn Host, offset: usize, max_size: usize) -> Option<Vec<u8>> {
202 host.get_http_response_body(offset, max_size)
203 }
204
205 fn write_body(host: &dyn Host, offset: usize, size: usize, value: &[u8]) {
206 host.set_http_response_body(offset, size, value)
207 }
208}
209
210#[derive(Clone, Debug)]
211pub struct ResponseTrailers {
213 pub(crate) _num_trailers: usize,
214}
215
216impl HeadersEvent for ResponseTrailers {}
217
218#[derive(Clone, Debug)]
219pub struct ExchangeComplete {}
221
222macro_rules! should_pause {
223 (RequestBody, $value:expr) => {
224 !$value.end_of_stream
225 };
226
227 (ResponseBody, $value:expr) => {
228 !$value.end_of_stream
229 };
230
231 ($event:ty, $value:expr) => {
232 false
233 };
234}
235
236macro_rules! body_size {
237 (RequestBody, $value:expr) => {
238 $value.body_size
239 };
240
241 (ResponseBody, $value:expr) => {
242 $value.body_size
243 };
244
245 ($event:ty, $value:expr) => {
246 0
247 };
248}
249
250macro_rules! impl_after {
252
253 ($event:ty) => {};
254
255 ($event:ty, $($after:ty),*) => {
256 $(impl After<$after> for $event {})*
257 impl_after!($($after),*);
258 };
259}
260
261macro_rules! after {
263
264 ([] $($reversed:tt)*) => {
265 impl_after!($($reversed),*); };
267
268 ([$first:tt $($rest:tt)*] $($reversed:tt)*) => {
269 after!([$($rest)*] $first $($reversed)*); };
271}
272
273macro_rules! finite_events {
274
275 ($($event:ident,)+) => {
276
277 #[derive(Clone, Copy, PartialEq, Eq, Debug)]
278 pub enum EventKind {
280 $($event),+
281 }
282
283 #[derive(Clone, Debug)]
284 pub enum FiniteEvent {
286 $($event($event)),+
287 }
288
289 impl FiniteEvent {
290 pub fn kind(&self) -> EventKind {
291 match self {
292 $(Self::$event(_) => EventKind::$event),+
293 }
294 }
295 }
296
297 after!([$($event)*]);
298
299 $(
300 impl Sealed for $event {}
301
302 impl Event for $event {
303 fn kind() -> EventKind {
304 EventKind::$event
305 }
306
307 fn should_pause(&self) -> bool {
308 should_pause!($event, self)
309 }
310
311 fn body_size(&self) -> usize {
312 body_size!($event, self)
313 }
314 }
315
316 impl From<$event> for FiniteEvent {
317 fn from(event: $event) -> Self {
318 Self::$event(event)
319 }
320 }
321
322 impl TryFrom<FiniteEvent> for $event {
323 type Error = FiniteEvent;
324
325 fn try_from(finite_event: FiniteEvent) -> Result<Self, FiniteEvent> {
326 match finite_event {
327 FiniteEvent::$event(e) => Ok(e),
328 e => Err(e),
329 }
330 }
331 }
332 )*
333 };
334}
335
336finite_events! {
337 Start,
338 RequestHeaders,
339 RequestBody,
340 RequestTrailers,
341 ResponseHeaders,
342 ResponseBody,
343 ResponseTrailers,
344 ExchangeComplete,
345}
346
347impl PartialOrd for EventKind {
348 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
349 Some(self.cmp(other))
350 }
351}
352
353impl Ord for EventKind {
354 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
355 self.index().cmp(&other.index())
356 }
357}
358
359impl EventKind {
360 fn index(&self) -> usize {
361 *self as usize
362 }
363}
364
365struct InnerExchange<S: Event> {
366 reactor: Rc<HttpReactor>,
367 host: Rc<dyn Host>,
368 first_event: RefCell<Option<S>>,
369 empty_stream: Cell<bool>,
370 offset: Cell<usize>,
371 event_count: Cell<usize>,
372}
373
374impl<S: Event> InnerExchange<S> {
375 fn take_first_event(&self) -> Option<S> {
376 self.first_event.borrow_mut().take()
377 }
378}
379
380pub struct Exchange<S: Event = Start> {
382 pub(crate) reactor: Rc<HttpReactor>,
383 pub(crate) host: Rc<dyn Host>,
384 inner: Rc<InnerExchange<S>>,
385}
386
387pub struct EventData<'a, S: Event> {
389 exchange: Rc<InnerExchange<S>>,
390 pub(crate) event: S,
391 offset: usize,
392 _lifetime: PhantomData<&'a ()>,
393}
394
395impl<'a, S: Event> EventData<'a, S> {
396 pub(crate) fn new(exchange: &'a Exchange<S>, event: S, offset: usize) -> Self {
397 Self {
398 exchange: exchange.inner.clone(),
399 event,
400 offset,
401 _lifetime: PhantomData,
402 }
403 }
404
405 fn from_inner(exchange: Rc<InnerExchange<S>>, event: S, offset: usize) -> Self {
406 Self {
407 exchange,
408 event,
409 offset,
410 _lifetime: PhantomData,
411 }
412 }
413}
414
415pub trait HeadersAccessor {
417 fn header(&self, name: &str) -> Option<String>;
421
422 fn headers(&self) -> Vec<(String, String)>;
426
427 fn add_header(&self, name: &str, value: &str);
429
430 fn set_header(&self, name: &str, value: &str);
432
433 fn set_headers(&self, headers: Vec<(&str, &str)>);
435
436 fn remove_header(&self, name: &str);
438}
439
440impl EventData<'_, RequestHeaders> {
441 pub fn method(&self) -> String {
442 self.header(HEADER_METHOD).unwrap_or_default()
443 }
444
445 pub fn scheme(&self) -> String {
446 self.header(HEADER_SCHEME).unwrap_or_default()
447 }
448
449 pub fn authority(&self) -> String {
450 self.header(HEADER_AUTHORITY).unwrap_or_default()
451 }
452
453 pub fn path(&self) -> String {
454 self.header(HEADER_PATH)
455 .unwrap_or_else(|| DEFAULT_PATH.to_string())
456 }
457}
458
459impl EventData<'_, ResponseHeaders> {
460 pub fn status_code(&self) -> u32 {
461 self.header(HEADER_STATUS)
462 .and_then(|status| status.parse::<u32>().ok())
463 .unwrap_or_default()
464 }
465}
466
467impl HeadersAccessor for EventData<'_, RequestHeaders> {
468 fn header(&self, name: &str) -> Option<String> {
469 self.exchange.host.get_http_request_header(name)
470 }
471
472 fn headers(&self) -> Vec<(String, String)> {
473 self.exchange.host.get_http_request_headers()
474 }
475
476 fn add_header(&self, name: &str, value: &str) {
477 self.exchange.host.add_http_request_header(name, value);
478 }
479
480 fn set_header(&self, name: &str, value: &str) {
481 self.exchange
482 .host
483 .set_http_request_header(name, Some(value));
484 }
485
486 fn set_headers(&self, headers: Vec<(&str, &str)>) {
487 self.exchange.host.set_http_request_headers(headers);
488 }
489
490 fn remove_header(&self, name: &str) {
491 self.exchange.host.set_http_request_header(name, None);
492 }
493}
494
495impl EventData<'_, RequestTrailers> {
496 pub fn header(&self, name: &str) -> Option<String> {
497 self.exchange.host.get_http_request_trailer(name)
498 }
499
500 pub fn headers(&self) -> Vec<(String, String)> {
501 self.exchange.host.get_http_request_trailers()
502 }
503}
504
505impl HeadersAccessor for EventData<'_, ResponseHeaders> {
506 fn header(&self, name: &str) -> Option<String> {
507 self.exchange.host.get_http_response_header(name)
508 }
509
510 fn headers(&self) -> Vec<(String, String)> {
511 self.exchange.host.get_http_response_headers()
512 }
513
514 fn add_header(&self, name: &str, value: &str) {
515 self.exchange.host.add_http_response_header(name, value);
516 }
517
518 fn set_header(&self, name: &str, value: &str) {
519 self.exchange
520 .host
521 .set_http_response_header(name, Some(value));
522 }
523
524 fn set_headers(&self, headers: Vec<(&str, &str)>) {
525 self.exchange.host.set_http_response_headers(headers);
526 }
527
528 fn remove_header(&self, name: &str) {
529 self.exchange.host.set_http_response_header(name, None);
530 }
531}
532
533impl<S: Event> Exchange<S> {
534 pub(crate) fn new(
535 reactor: Rc<HttpReactor>,
536 host: Rc<dyn Host>,
537 first_event: Option<S>,
538 ) -> Self {
539 let empty_stream = first_event
540 .as_ref()
541 .map(|e| !e.should_pause())
542 .unwrap_or(false);
543 Self {
544 reactor: reactor.clone(),
545 host: host.clone(),
546 inner: Rc::new(InnerExchange {
547 reactor,
548 host,
549 first_event: RefCell::new(first_event),
550 empty_stream: Cell::new(empty_stream),
551 event_count: Cell::new(0),
552 offset: Cell::new(0),
553 }),
554 }
555 }
556
557 pub fn event_data(&self) -> Option<EventData<S>>
558 where
559 S: HeadersEvent,
560 {
561 let finite_event = self.reactor.cloned_finite_event();
562 S::try_from(finite_event)
563 .ok()
564 .map(|e| EventData::new(self, e, 0))
565 }
566
567 #[must_use]
568 pub fn event_data_stream(&self) -> EventDataStream<S>
569 where
570 S: BodyEvent,
571 {
572 EventDataStream {
573 id_and_waker: None,
574 exchange: self.inner.clone(),
575 buffering: true,
576 _lifetime: PhantomData,
577 }
578 }
579
580 pub(crate) fn static_event_data_stream(&self) -> EventDataStream<'static, S>
581 where
582 S: BodyEvent,
583 {
584 EventDataStream {
585 id_and_waker: None,
586 exchange: self.inner.clone(),
587 buffering: false,
588 _lifetime: PhantomData,
589 }
590 }
591
592 pub(crate) async fn wait_for_event<E>(self) -> Exchange<E>
593 where
594 E: Event,
595 S: Before<E>,
596 {
597 self.wait_for_event_buffering(true).await
598 }
599
600 pub(crate) async fn wait_for_event_buffering<E>(self, buffering: bool) -> Exchange<E>
601 where
602 E: Event,
603 S: Before<E>,
604 {
605 let exchange: Exchange<E> =
606 Exchange::new(Rc::clone(&self.reactor), Rc::clone(&self.host), None);
607
608 drop(self);
610
611 let mut stream = EventDataStream {
612 id_and_waker: None,
613 exchange: exchange.inner.clone(),
614 buffering,
615 _lifetime: PhantomData,
616 };
617 let first_event = stream.next().await.map(|ed| ed.event);
618
619 *exchange.inner.first_event.borrow_mut() = first_event;
620 exchange.inner.offset.set(0);
621 exchange.inner.empty_stream.set(false);
622
623 exchange
624 }
625
626 pub async fn wait_for_request_headers(self) -> Exchange<RequestHeaders>
627 where
628 S: Before<RequestHeaders>,
629 {
630 self.wait_for_event().await
631 }
632
633 pub async fn wait_for_request_body(self) -> Exchange<RequestBody>
634 where
635 S: Before<RequestBody>,
636 {
637 self.wait_for_event().await
638 }
639
640 pub(crate) async fn _wait_for_request_trailers(self) -> Exchange<RequestTrailers>
641 where
642 S: Before<RequestTrailers>,
643 {
644 self.wait_for_event().await
645 }
646
647 pub async fn wait_for_response_headers(self) -> Exchange<ResponseHeaders>
648 where
649 S: Before<ResponseHeaders>,
650 {
651 self.wait_for_event().await
652 }
653
654 pub async fn wait_for_response_body(self) -> Exchange<ResponseBody>
655 where
656 S: Before<ResponseBody>,
657 {
658 self.wait_for_event().await
659 }
660
661 pub(crate) async fn _wait_for_response_trailers(self) -> Exchange<ResponseTrailers>
662 where
663 S: Before<ResponseTrailers>,
664 {
665 self.wait_for_event().await
666 }
667
668 pub(crate) async fn _wait_for_exchange_complete(self) -> Exchange<ExchangeComplete>
669 where
670 S: Before<ExchangeComplete>,
671 {
672 self.wait_for_event().await
673 }
674
675 pub fn send_response(self, status_code: u32, headers: Vec<(&str, &str)>, body: Option<&[u8]>)
676 where
677 S: After<Start> + Before<ResponseHeaders>,
678 {
679 self.host
680 .set_effective_context(self.reactor.context_id().into());
681 self.reactor.set_paused(true);
682 self.reactor.cancel_request();
683 self.host.send_http_response(status_code, headers, body);
684 }
685}
686
687impl<S: Event> Drop for Exchange<S> {
688 fn drop(&mut self) {
689 let reactor = &self.reactor;
690 let host = &self.host;
691 if !reactor.is_done() && reactor.paused() && !reactor.cancelled_request() {
692 reactor.set_paused(false);
693
694 host.set_effective_context(reactor.context_id().into());
695
696 match reactor.phase() {
697 ExchangePhase::Request => host.resume_http_request(),
698 ExchangePhase::Response => host.resume_http_response(),
699 }
700 }
701 }
702}
703
704impl<S> EventData<'_, S>
705where
706 S: BodyEvent,
707{
708 pub fn offset(&self) -> usize {
709 self.offset
710 }
711
712 pub fn chunk_size(&self) -> usize {
713 self.event.body_size()
714 }
715
716 pub fn read_body(&self, offset: usize, max_size: usize) -> Vec<u8> {
717 S::read_body(self.exchange.host.deref(), offset, max_size).unwrap_or_default()
718 }
719
720 pub fn read_chunk(&self) -> Vec<u8> {
721 self.read_body(self.offset, self.event.body_size())
722 }
723
724 pub fn read_payload(&self) -> Vec<u8> {
725 self.read_body(0, self.event.body_size())
726 }
727}
728
729pub struct EventDataStream<'e, S: Event> {
731 exchange: Rc<InnerExchange<S>>,
732 id_and_waker: Option<(WakerId, Waker)>,
733 buffering: bool,
734 _lifetime: PhantomData<&'e ()>,
735}
736
737impl<'e, S: Event> EventDataStream<'e, S> {
738 fn process_event(&mut self, event: S) -> EventData<'e, S> {
739 let exchange = self.exchange.clone();
740 let reactor = exchange.reactor.as_ref();
741 let pause = self.buffering && event.should_pause();
742 reactor.set_paused(pause);
743 let offset = exchange.offset.get();
744 exchange.offset.set(event.body_size());
745 exchange.empty_stream.set(!event.should_pause());
746 EventData::from_inner(exchange, event, offset)
747 }
748}
749
750impl<'e, S: Event> Stream for EventDataStream<'e, S> {
751 type Item = EventData<'e, S>;
752
753 fn poll_next(
754 mut self: std::pin::Pin<&mut Self>,
755 cx: &mut std::task::Context<'_>,
756 ) -> Poll<Option<Self::Item>> {
757 let reactor = self.exchange.reactor.clone();
758 let exchange = self.exchange.clone();
759
760 if exchange.empty_stream.get() {
761 if let Some((id, _)) = self.id_and_waker.take() {
762 reactor.remove_waker(S::kind(), id);
764 }
765 return Poll::Ready(None);
766 }
767
768 if reactor.current_event() >= S::kind() {
769 let event_data = if let Some(event) = exchange.take_first_event() {
770 if let Some((id, _)) = self.id_and_waker.take() {
771 reactor.remove_waker(S::kind(), id);
773 }
774 Some(self.process_event(event))
775 } else {
776 let event_count = reactor.event_count();
777
778 if event_count > exchange.event_count.get() {
780 exchange.event_count.set(event_count);
781 if let Ok(event) = S::try_from(reactor.cloned_finite_event()) {
782 Some(self.process_event(event))
783 } else {
784 None
785 }
786 } else {
787 return Poll::Pending;
788 }
789 };
790
791 Poll::Ready(event_data)
792 } else {
793 match &self.id_and_waker {
794 None => {
795 let id = reactor.insert_waker(S::kind(), cx.waker().clone());
797 self.id_and_waker = Some((id, cx.waker().clone()));
798 }
799 Some((id, w)) if !w.will_wake(cx.waker()) => {
800 reactor.remove_waker(S::kind(), *id);
802
803 let id = reactor.insert_waker(S::kind(), cx.waker().clone());
805 self.id_and_waker = Some((id, cx.waker().clone()));
806 }
807 Some(_) => {}
808 }
809 Poll::Pending
810 }
811 }
812}