1use std::{
8 cell::{Cell, RefCell},
9 convert::TryFrom,
10 marker::PhantomData,
11 ops::Deref,
12 rc::Rc,
13 task::{Poll, Waker},
14};
15
16use futures::{Stream, StreamExt};
17
18use crate::reactor::http::ExchangePhase;
19use crate::{
20 host::Host,
21 reactor::http::{HttpReactor, WakerId},
22 types::HttpCid,
23};
24
25mod private {
26 use crate::host::Host;
27
28 pub trait Sealed {}
29
30 pub trait BodyAccessor {
31 fn read_body(host: &dyn Host, offset: usize, max_size: usize) -> Option<Vec<u8>>;
32
33 fn write_body(host: &dyn Host, offset: usize, size: usize, value: &[u8]);
34 }
35}
36
37use private::{BodyAccessor, Sealed};
38
39pub trait Event: Sealed + Clone + Into<FiniteEvent> + TryFrom<FiniteEvent> + Unpin {
41 fn kind() -> EventKind;
42
43 fn event_kind(&self) -> EventKind {
44 Self::kind()
45 }
46
47 fn body_size(&self) -> usize;
48
49 fn should_pause(&self) -> bool;
50
51 fn end_of_stream(&self) -> bool;
52}
53
54pub trait After<S: Event>: Event {}
56
57pub trait Before<S: Event>: Event {}
59
60impl<A, B> Before<B> for A
61where
62 B: After<A>,
63 A: Event,
64{
65}
66
67pub trait BodyEvent: Event + BodyAccessor {}
69
70pub trait HeadersEvent: Event {}
72
73#[derive(Clone, Debug)]
75pub struct Start {
76 pub(crate) _context_id: HttpCid,
77}
78
79#[derive(Clone, Debug)]
80pub struct RequestHeaders {
82 pub(crate) _num_headers: usize,
83 pub(crate) end_of_stream: bool,
84}
85
86impl HeadersEvent for RequestHeaders {}
87
88#[derive(Clone, Debug)]
89pub struct RequestBody {
91 pub(crate) body_size: usize,
92 pub(crate) end_of_stream: bool,
93}
94
95impl BodyEvent for RequestBody {}
96
97impl BodyAccessor for RequestBody {
98 fn read_body(host: &dyn Host, offset: usize, max_size: usize) -> Option<Vec<u8>> {
99 host.get_http_request_body(offset, max_size)
100 }
101
102 fn write_body(host: &dyn Host, offset: usize, size: usize, value: &[u8]) {
103 host.set_http_request_body(offset, size, value)
104 }
105}
106
107#[cfg(feature = "experimental_enable_stop_iteration")]
108impl HeadersAccessor for EventDataStream<'_, RequestHeaders> {
109 fn header(&self, name: &str) -> Option<String> {
110 self.exchange.host.get_http_request_header(name)
111 }
112
113 fn headers(&self) -> Vec<(String, String)> {
114 self.exchange.host.get_http_request_headers()
115 }
116
117 fn add_header(&self, name: &str, value: &str) {
118 self.exchange.host.add_http_request_header(name, value);
119 }
120
121 fn set_header(&self, name: &str, value: &str) {
122 self.exchange
123 .host
124 .set_http_request_header(name, Some(value));
125 }
126
127 fn set_headers(&self, headers: Vec<(&str, &str)>) {
128 self.exchange.host.set_http_request_headers(headers);
129 }
130
131 fn remove_header(&self, name: &str) {
132 self.exchange.host.set_http_request_header(name, None);
133 }
134}
135
136#[cfg(feature = "experimental_enable_stop_iteration")]
137impl HeadersAccessor for EventDataStream<'_, RequestBody> {
138 fn header(&self, name: &str) -> Option<String> {
139 self.exchange.host.get_http_request_header(name)
140 }
141
142 fn headers(&self) -> Vec<(String, String)> {
143 self.exchange.host.get_http_request_headers()
144 }
145
146 fn add_header(&self, name: &str, value: &str) {
147 self.exchange.host.add_http_request_header(name, value);
148 }
149
150 fn set_header(&self, name: &str, value: &str) {
151 self.exchange
152 .host
153 .set_http_request_header(name, Some(value));
154 }
155
156 fn set_headers(&self, headers: Vec<(&str, &str)>) {
157 self.exchange.host.set_http_request_headers(headers);
158 }
159
160 fn remove_header(&self, name: &str) {
161 self.exchange.host.set_http_request_header(name, None);
162 }
163}
164
165#[cfg(feature = "experimental_enable_stop_iteration")]
166impl HeadersAccessor for EventDataStream<'_, ResponseHeaders> {
167 fn header(&self, name: &str) -> Option<String> {
168 self.exchange.host.get_http_response_header(name)
169 }
170
171 fn headers(&self) -> Vec<(String, String)> {
172 self.exchange.host.get_http_response_headers()
173 }
174
175 fn add_header(&self, name: &str, value: &str) {
176 self.exchange.host.add_http_response_header(name, value);
177 }
178
179 fn set_header(&self, name: &str, value: &str) {
180 self.exchange
181 .host
182 .set_http_response_header(name, Some(value));
183 }
184
185 fn set_headers(&self, headers: Vec<(&str, &str)>) {
186 self.exchange.host.set_http_response_headers(headers);
187 }
188
189 fn remove_header(&self, name: &str) {
190 self.exchange.host.set_http_response_header(name, None);
191 }
192}
193
194#[cfg(feature = "experimental_enable_stop_iteration")]
195impl HeadersAccessor for EventDataStream<'_, ResponseBody> {
196 fn header(&self, name: &str) -> Option<String> {
197 self.exchange.host.get_http_response_header(name)
198 }
199
200 fn headers(&self) -> Vec<(String, String)> {
201 self.exchange.host.get_http_response_headers()
202 }
203
204 fn add_header(&self, name: &str, value: &str) {
205 self.exchange.host.add_http_response_header(name, value);
206 }
207
208 fn set_header(&self, name: &str, value: &str) {
209 self.exchange
210 .host
211 .set_http_response_header(name, Some(value));
212 }
213
214 fn set_headers(&self, headers: Vec<(&str, &str)>) {
215 self.exchange.host.set_http_response_headers(headers);
216 }
217
218 fn remove_header(&self, name: &str) {
219 self.exchange.host.set_http_response_header(name, None);
220 }
221}
222
223#[derive(Clone, Debug)]
224pub struct RequestTrailers {
226 pub(crate) _num_trailers: usize,
227}
228
229impl HeadersEvent for RequestTrailers {}
230
231#[derive(Clone, Debug)]
232pub struct ResponseHeaders {
234 pub(crate) _num_headers: usize,
235 pub(crate) end_of_stream: bool,
236}
237
238impl HeadersEvent for ResponseHeaders {}
239
240#[derive(Clone, Debug)]
241pub struct ResponseBody {
243 pub(crate) body_size: usize,
244 pub(crate) end_of_stream: bool,
245}
246
247impl BodyEvent for ResponseBody {}
248
249impl BodyAccessor for ResponseBody {
250 fn read_body(host: &dyn Host, offset: usize, max_size: usize) -> Option<Vec<u8>> {
251 host.get_http_response_body(offset, max_size)
252 }
253
254 fn write_body(host: &dyn Host, offset: usize, size: usize, value: &[u8]) {
255 host.set_http_response_body(offset, size, value)
256 }
257}
258
259#[derive(Clone, Debug)]
260pub struct ResponseTrailers {
262 pub(crate) _num_trailers: usize,
263}
264
265impl HeadersEvent for ResponseTrailers {}
266
267#[derive(Clone, Debug)]
268pub struct ExchangeComplete {}
270
271macro_rules! should_pause {
272 (RequestBody, $value:expr) => {
273 !$value.end_of_stream
274 };
275
276 (ResponseBody, $value:expr) => {
277 !$value.end_of_stream
278 };
279
280 ($event:ty, $value:expr) => {
281 false
282 };
283}
284
285macro_rules! end_of_stream {
286 (RequestBody, $value:expr) => {
287 $value.end_of_stream
288 };
289
290 (ResponseBody, $value:expr) => {
291 $value.end_of_stream
292 };
293
294 (RequestHeaders, $value:expr) => {
295 $value.end_of_stream
296 };
297
298 (ResponseHeaders, $value:expr) => {
299 $value.end_of_stream
300 };
301
302 ($event:ty, $value:expr) => {
303 false
304 };
305}
306
307macro_rules! body_size {
308 (RequestBody, $value:expr) => {
309 $value.body_size
310 };
311
312 (ResponseBody, $value:expr) => {
313 $value.body_size
314 };
315
316 ($event:ty, $value:expr) => {
317 0
318 };
319}
320
321macro_rules! impl_after {
323
324 ($event:ty) => {};
325
326 ($event:ty, $($after:ty),*) => {
327 $(impl After<$after> for $event {})*
328 impl_after!($($after),*);
329 };
330}
331
332macro_rules! after {
334
335 ([] $($reversed:tt)*) => {
336 impl_after!($($reversed),*); };
338
339 ([$first:tt $($rest:tt)*] $($reversed:tt)*) => {
340 after!([$($rest)*] $first $($reversed)*); };
342}
343
344macro_rules! finite_events {
345
346 ($($event:ident,)+) => {
347
348 #[derive(Clone, Copy, PartialEq, Eq, Debug)]
349 pub enum EventKind {
351 $($event),+
352 }
353
354 #[derive(Clone, Debug)]
355 pub enum FiniteEvent {
357 $($event($event)),+
358 }
359
360 impl FiniteEvent {
361 pub fn kind(&self) -> EventKind {
362 match self {
363 $(Self::$event(_) => EventKind::$event),+
364 }
365 }
366
367 pub fn end_of_stream(&self) -> bool {
368 match self {
369 $(Self::$event(e) => e.end_of_stream()),+
370 }
371 }
372 }
373
374 after!([$($event)*]);
375
376 $(
377 impl Sealed for $event {}
378
379 impl Event for $event {
380 fn kind() -> EventKind {
381 EventKind::$event
382 }
383
384 fn should_pause(&self) -> bool {
385 should_pause!($event, self)
386 }
387
388 fn body_size(&self) -> usize {
389 body_size!($event, self)
390 }
391
392 fn end_of_stream(&self) -> bool {
393 end_of_stream!($event, self)
394 }
395 }
396
397 impl From<$event> for FiniteEvent {
398 fn from(event: $event) -> Self {
399 Self::$event(event)
400 }
401 }
402
403 impl TryFrom<FiniteEvent> for $event {
404 type Error = FiniteEvent;
405
406 fn try_from(finite_event: FiniteEvent) -> Result<Self, FiniteEvent> {
407 match finite_event {
408 FiniteEvent::$event(e) => Ok(e),
409 e => Err(e),
410 }
411 }
412 }
413 )*
414 };
415}
416
417finite_events! {
418 Start,
419 RequestHeaders,
420 RequestBody,
421 RequestTrailers,
422 ResponseHeaders,
423 ResponseBody,
424 ResponseTrailers,
425 ExchangeComplete,
426}
427
428impl PartialOrd for EventKind {
429 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
430 Some(self.cmp(other))
431 }
432}
433
434impl Ord for EventKind {
435 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
436 self.index().cmp(&other.index())
437 }
438}
439
440impl EventKind {
441 fn index(&self) -> usize {
442 *self as usize
443 }
444}
445
446struct InnerExchange<S: Event> {
447 reactor: Rc<HttpReactor>,
448 host: Rc<dyn Host>,
449 first_event: RefCell<Option<S>>,
450 empty_stream: Cell<bool>,
451 offset: Cell<usize>,
452 event_count: Cell<usize>,
453}
454
455impl<S: Event> InnerExchange<S> {
456 fn take_first_event(&self) -> Option<S> {
457 self.first_event.borrow_mut().take()
458 }
459}
460
461pub struct Exchange<S: Event = Start> {
463 pub(crate) reactor: Rc<HttpReactor>,
464 pub(crate) host: Rc<dyn Host>,
465 inner: Rc<InnerExchange<S>>,
466}
467
468pub struct EventData<'a, S: Event> {
470 exchange: Rc<InnerExchange<S>>,
471 pub(crate) event: S,
472 offset: usize,
473 _lifetime: PhantomData<&'a ()>,
474}
475
476impl<'a, S: Event> EventData<'a, S> {
477 pub(crate) fn new(exchange: &'a Exchange<S>, event: S, offset: usize) -> Self {
478 Self {
479 exchange: exchange.inner.clone(),
480 event,
481 offset,
482 _lifetime: PhantomData,
483 }
484 }
485
486 fn from_inner(exchange: Rc<InnerExchange<S>>, event: S, offset: usize) -> Self {
487 Self {
488 exchange,
489 event,
490 offset,
491 _lifetime: PhantomData,
492 }
493 }
494}
495
496pub trait HeadersAccessor {
498 fn header(&self, name: &str) -> Option<String>;
502
503 fn headers(&self) -> Vec<(String, String)>;
507
508 fn add_header(&self, name: &str, value: &str);
510
511 fn set_header(&self, name: &str, value: &str);
513
514 fn set_headers(&self, headers: Vec<(&str, &str)>);
516
517 fn remove_header(&self, name: &str);
519}
520
521impl HeadersAccessor for EventData<'_, RequestHeaders> {
522 fn header(&self, name: &str) -> Option<String> {
523 self.exchange.host.get_http_request_header(name)
524 }
525
526 fn headers(&self) -> Vec<(String, String)> {
527 self.exchange.host.get_http_request_headers()
528 }
529
530 fn add_header(&self, name: &str, value: &str) {
531 self.exchange.host.add_http_request_header(name, value);
532 }
533
534 fn set_header(&self, name: &str, value: &str) {
535 self.exchange
536 .host
537 .set_http_request_header(name, Some(value));
538 }
539
540 fn set_headers(&self, headers: Vec<(&str, &str)>) {
541 self.exchange.host.set_http_request_headers(headers);
542 }
543
544 fn remove_header(&self, name: &str) {
545 self.exchange.host.set_http_request_header(name, None);
546 }
547}
548
549impl EventData<'_, RequestTrailers> {
550 pub fn header(&self, name: &str) -> Option<String> {
551 self.exchange.host.get_http_request_trailer(name)
552 }
553
554 pub fn headers(&self) -> Vec<(String, String)> {
555 self.exchange.host.get_http_request_trailers()
556 }
557}
558
559impl HeadersAccessor for EventData<'_, ResponseHeaders> {
560 fn header(&self, name: &str) -> Option<String> {
561 self.exchange.host.get_http_response_header(name)
562 }
563
564 fn headers(&self) -> Vec<(String, String)> {
565 self.exchange.host.get_http_response_headers()
566 }
567
568 fn add_header(&self, name: &str, value: &str) {
569 self.exchange.host.add_http_response_header(name, value);
570 }
571
572 fn set_header(&self, name: &str, value: &str) {
573 self.exchange
574 .host
575 .set_http_response_header(name, Some(value));
576 }
577
578 fn set_headers(&self, headers: Vec<(&str, &str)>) {
579 self.exchange.host.set_http_response_headers(headers);
580 }
581
582 fn remove_header(&self, name: &str) {
583 self.exchange.host.set_http_response_header(name, None);
584 }
585}
586
587impl<S: Event> Exchange<S> {
588 pub(crate) fn new(
589 reactor: Rc<HttpReactor>,
590 host: Rc<dyn Host>,
591 first_event: Option<S>,
592 ) -> Self {
593 let empty_stream = first_event
594 .as_ref()
595 .map(|e| !e.should_pause())
596 .unwrap_or(false);
597 Self {
598 reactor: reactor.clone(),
599 host: host.clone(),
600 inner: Rc::new(InnerExchange {
601 reactor,
602 host,
603 first_event: RefCell::new(first_event),
604 empty_stream: Cell::new(empty_stream),
605 event_count: Cell::new(0),
606 offset: Cell::new(0),
607 }),
608 }
609 }
610
611 pub fn event_data(&self) -> Option<EventData<S>>
612 where
613 S: HeadersEvent,
614 {
615 let finite_event = self.reactor.cloned_finite_event();
616 S::try_from(finite_event)
617 .ok()
618 .map(|e| EventData::new(self, e, 0))
619 }
620
621 #[must_use]
622 pub fn event_data_stream(&self) -> EventDataStream<S> {
623 EventDataStream {
624 id_and_waker: None,
625 exchange: self.inner.clone(),
626 buffering: true,
627 _lifetime: PhantomData,
628 }
629 }
630
631 pub(crate) fn static_event_data_stream(&self) -> EventDataStream<'static, S>
632 where
633 S: BodyEvent,
634 {
635 EventDataStream {
636 id_and_waker: None,
637 exchange: self.inner.clone(),
638 buffering: false,
639 _lifetime: PhantomData,
640 }
641 }
642
643 pub(crate) async fn wait_for_event<E>(self) -> Exchange<E>
644 where
645 E: Event,
646 S: Before<E>,
647 {
648 self.wait_for_event_buffering(true).await
649 }
650
651 pub(crate) async fn wait_for_event_buffering<E>(self, buffering: bool) -> Exchange<E>
652 where
653 E: Event,
654 S: Before<E>,
655 {
656 let exchange: Exchange<E> =
657 Exchange::new(Rc::clone(&self.reactor), Rc::clone(&self.host), None);
658
659 drop(self);
661
662 let mut stream = EventDataStream {
663 id_and_waker: None,
664 exchange: exchange.inner.clone(),
665 buffering,
666 _lifetime: PhantomData,
667 };
668 let first_event = stream.next().await.map(|ed| ed.event);
669
670 *exchange.inner.first_event.borrow_mut() = first_event;
671 exchange.inner.offset.set(0);
672 exchange.inner.empty_stream.set(false);
673
674 exchange
675 }
676
677 pub async fn wait_for_request_headers(self) -> Exchange<RequestHeaders>
678 where
679 S: Before<RequestHeaders>,
680 {
681 self.wait_for_event().await
682 }
683
684 pub async fn wait_for_request_body(self) -> Exchange<RequestBody>
685 where
686 S: Before<RequestBody>,
687 {
688 self.wait_for_event().await
689 }
690
691 pub(crate) async fn _wait_for_request_trailers(self) -> Exchange<RequestTrailers>
692 where
693 S: Before<RequestTrailers>,
694 {
695 self.wait_for_event().await
696 }
697
698 pub async fn wait_for_response_headers(self) -> Exchange<ResponseHeaders>
699 where
700 S: Before<ResponseHeaders>,
701 {
702 self.wait_for_event().await
703 }
704
705 pub async fn wait_for_response_body(self) -> Exchange<ResponseBody>
706 where
707 S: Before<ResponseBody>,
708 {
709 self.wait_for_event().await
710 }
711
712 pub(crate) async fn _wait_for_response_trailers(self) -> Exchange<ResponseTrailers>
713 where
714 S: Before<ResponseTrailers>,
715 {
716 self.wait_for_event().await
717 }
718
719 pub(crate) async fn _wait_for_exchange_complete(self) -> Exchange<ExchangeComplete>
720 where
721 S: Before<ExchangeComplete>,
722 {
723 self.wait_for_event().await
724 }
725
726 pub fn send_response(self, status_code: u32, headers: Vec<(&str, &str)>, body: Option<&[u8]>)
727 where
728 S: After<Start> + Before<ResponseBody>,
729 {
730 self.host
731 .set_effective_context(self.reactor.context_id().into());
732 self.reactor.set_paused(true);
733 self.reactor.cancel_request();
734 self.host.send_http_response(status_code, headers, body);
735 }
736
737 fn should_resume(&self, current: EventKind, next: EventKind) -> bool {
739 let feature_enabled = cfg!(feature = "experimental_enable_stop_iteration");
740 let current_match = self.is_event(current);
741 let next_match = next == self.reactor.current_event();
742
743 !feature_enabled || !current_match || !next_match
744 }
745
746 fn should_resume_request(&self) -> bool {
747 self.should_resume(EventKind::RequestHeaders, EventKind::RequestBody)
748 }
749
750 fn should_resume_response(&self) -> bool {
751 self.should_resume(EventKind::ResponseHeaders, EventKind::ResponseBody)
752 }
753
754 fn is_event(&self, kind: EventKind) -> bool {
755 self.inner
756 .first_event
757 .borrow()
758 .as_ref()
759 .map(|d| d.event_kind() == kind)
760 .unwrap_or(false)
761 }
762}
763
764impl<S: Event> Drop for Exchange<S> {
765 fn drop(&mut self) {
766 let reactor = &self.reactor;
767 let host = &self.host;
768 if !reactor.is_done()
769 && reactor.paused()
770 && ((!reactor.cancelled_request() && reactor.phase() == ExchangePhase::Request)
771 || (!reactor.cancelled_response() && reactor.phase() == ExchangePhase::Response))
772 {
773 match reactor.phase() {
774 ExchangePhase::Request => {
775 if self.should_resume_request() {
776 reactor.set_paused(false);
777 reactor.set_eos_paused(false);
778 host.set_effective_context(reactor.context_id().into());
779 host.resume_http_request()
780 }
781 }
782 ExchangePhase::Response => {
783 if self.should_resume_response() {
784 reactor.set_paused(false);
785 reactor.set_eos_paused(false);
786 host.set_effective_context(reactor.context_id().into());
787 host.resume_http_response()
788 }
789 }
790 }
791 }
792 }
793}
794
795impl<S> EventData<'_, S>
796where
797 S: BodyEvent,
798{
799 pub fn offset(&self) -> usize {
800 self.offset
801 }
802
803 pub fn chunk_size(&self) -> usize {
804 self.event.body_size()
805 }
806
807 pub fn read_body(&self, offset: usize, max_size: usize) -> Vec<u8> {
808 S::read_body(self.exchange.host.deref(), offset, max_size).unwrap_or_default()
809 }
810
811 pub fn read_chunk(&self) -> Vec<u8> {
812 self.read_body(self.offset, self.event.body_size())
813 }
814
815 pub fn read_payload(&self) -> Vec<u8> {
816 self.read_body(0, self.event.body_size())
817 }
818}
819
820pub struct EventDataStream<'e, S: Event> {
822 exchange: Rc<InnerExchange<S>>,
823 id_and_waker: Option<(WakerId, Waker)>,
824 buffering: bool,
825 _lifetime: PhantomData<&'e ()>,
826}
827
828impl<'e, S: Event> EventDataStream<'e, S> {
829 fn process_event(&mut self, event: S) -> EventData<'e, S> {
830 let exchange = self.exchange.clone();
831 let reactor = exchange.reactor.as_ref();
832
833 let pause = self.buffering && event.should_pause() || reactor.eos_paused();
834
835 reactor.set_paused(pause);
836 let offset = exchange.offset.get();
837 exchange.offset.set(event.body_size());
838 exchange.empty_stream.set(!event.should_pause());
839 EventData::from_inner(exchange, event, offset)
840 }
841}
842
843impl<'e, S: Event> Stream for EventDataStream<'e, S> {
844 type Item = EventData<'e, S>;
845
846 fn poll_next(
847 mut self: std::pin::Pin<&mut Self>,
848 cx: &mut std::task::Context<'_>,
849 ) -> Poll<Option<Self::Item>> {
850 let reactor = self.exchange.reactor.clone();
851 let exchange = self.exchange.clone();
852
853 if exchange.empty_stream.get() {
854 if let Some((id, _)) = self.id_and_waker.take() {
855 reactor.remove_waker(S::kind(), id);
857 }
858 return Poll::Ready(None);
859 }
860
861 if reactor.current_event() >= S::kind() {
862 let event_data = if let Some(event) = exchange.take_first_event() {
863 if let Some((id, _)) = self.id_and_waker.take() {
864 reactor.remove_waker(S::kind(), id);
866 }
867 Some(self.process_event(event))
868 } else {
869 let event_count = reactor.event_count();
870
871 if event_count > exchange.event_count.get() {
873 exchange.event_count.set(event_count);
874 if let Ok(event) = S::try_from(reactor.cloned_finite_event()) {
875 Some(self.process_event(event))
876 } else {
877 None
878 }
879 } else {
880 return Poll::Pending;
881 }
882 };
883
884 Poll::Ready(event_data)
885 } else {
886 match &self.id_and_waker {
887 None => {
888 let id = reactor.insert_waker(S::kind(), cx.waker().clone());
890 self.id_and_waker = Some((id, cx.waker().clone()));
891 }
892 Some((id, w)) if !w.will_wake(cx.waker()) => {
893 reactor.remove_waker(S::kind(), *id);
895
896 let id = reactor.insert_waker(S::kind(), cx.waker().clone());
898 self.id_and_waker = Some((id, cx.waker().clone()));
899 }
900 Some(_) => {}
901 }
902 Poll::Pending
903 }
904 }
905}