Skip to main content

pdk_classy/
event.rs

1// Copyright (c) 2026, Salesforce, Inc.,
2// All rights reserved.
3// For full license text, see the LICENSE.txt file
4
5//! Abstractions of the proxy wasm underlying events.
6
7use std::{
8    cell::{Cell, RefCell},
9    convert::TryFrom,
10    marker::PhantomData,
11    ops::Deref,
12    rc::Rc,
13    task::{Poll, Waker},
14};
15
16use futures::{Stream, StreamExt};
17
18use crate::reactor::http::ExchangePhase;
19use crate::{
20    host::Host,
21    reactor::http::{HttpReactor, WakerId},
22    types::HttpCid,
23};
24
25mod private {
26    use crate::host::Host;
27
28    pub trait Sealed {}
29
30    pub trait BodyAccessor {
31        fn read_body(host: &dyn Host, offset: usize, max_size: usize) -> Option<Vec<u8>>;
32
33        fn write_body(host: &dyn Host, offset: usize, size: usize, value: &[u8]);
34    }
35}
36
37use private::{BodyAccessor, Sealed};
38
39/// Common interface that all underlying events share.
40pub trait Event: Sealed + Clone + Into<FiniteEvent> + TryFrom<FiniteEvent> + Unpin {
41    fn kind() -> EventKind;
42
43    fn event_kind(&self) -> EventKind {
44        Self::kind()
45    }
46
47    fn body_size(&self) -> usize;
48
49    fn should_pause(&self) -> bool;
50
51    fn end_of_stream(&self) -> bool;
52}
53
54/// Imposes an order of succession between two Events.
55pub trait After<S: Event>: Event {}
56
57/// Imposes an order of precedence between two Events.
58pub trait Before<S: Event>: Event {}
59
60impl<A, B> Before<B> for A
61where
62    B: After<A>,
63    A: Event,
64{
65}
66
67/// Events that can access the body and verify if more body remains.
68pub trait BodyEvent: Event + BodyAccessor {}
69
70/// Events that can access the headers.
71pub trait HeadersEvent: Event {}
72
73/// Alias name for CreateContext event
74#[derive(Clone, Debug)]
75pub struct Start {
76    pub(crate) _context_id: HttpCid,
77}
78
79#[derive(Clone, Debug)]
80/// Represents the state where the request headers are available.
81pub struct RequestHeaders {
82    pub(crate) _num_headers: usize,
83    pub(crate) end_of_stream: bool,
84}
85
86impl HeadersEvent for RequestHeaders {}
87
88#[derive(Clone, Debug)]
89/// Represents the state where the request body is available.
90pub struct RequestBody {
91    pub(crate) body_size: usize,
92    pub(crate) end_of_stream: bool,
93}
94
95impl BodyEvent for RequestBody {}
96
97impl BodyAccessor for RequestBody {
98    fn read_body(host: &dyn Host, offset: usize, max_size: usize) -> Option<Vec<u8>> {
99        host.get_http_request_body(offset, max_size)
100    }
101
102    fn write_body(host: &dyn Host, offset: usize, size: usize, value: &[u8]) {
103        host.set_http_request_body(offset, size, value)
104    }
105}
106
107#[cfg(feature = "experimental_enable_stop_iteration")]
108impl HeadersAccessor for EventDataStream<'_, RequestHeaders> {
109    fn header(&self, name: &str) -> Option<String> {
110        self.exchange.host.get_http_request_header(name)
111    }
112
113    fn headers(&self) -> Vec<(String, String)> {
114        self.exchange.host.get_http_request_headers()
115    }
116
117    fn add_header(&self, name: &str, value: &str) {
118        self.exchange.host.add_http_request_header(name, value);
119    }
120
121    fn set_header(&self, name: &str, value: &str) {
122        self.exchange
123            .host
124            .set_http_request_header(name, Some(value));
125    }
126
127    fn set_headers(&self, headers: Vec<(&str, &str)>) {
128        self.exchange.host.set_http_request_headers(headers);
129    }
130
131    fn remove_header(&self, name: &str) {
132        self.exchange.host.set_http_request_header(name, None);
133    }
134}
135
136#[cfg(feature = "experimental_enable_stop_iteration")]
137impl HeadersAccessor for EventDataStream<'_, RequestBody> {
138    fn header(&self, name: &str) -> Option<String> {
139        self.exchange.host.get_http_request_header(name)
140    }
141
142    fn headers(&self) -> Vec<(String, String)> {
143        self.exchange.host.get_http_request_headers()
144    }
145
146    fn add_header(&self, name: &str, value: &str) {
147        self.exchange.host.add_http_request_header(name, value);
148    }
149
150    fn set_header(&self, name: &str, value: &str) {
151        self.exchange
152            .host
153            .set_http_request_header(name, Some(value));
154    }
155
156    fn set_headers(&self, headers: Vec<(&str, &str)>) {
157        self.exchange.host.set_http_request_headers(headers);
158    }
159
160    fn remove_header(&self, name: &str) {
161        self.exchange.host.set_http_request_header(name, None);
162    }
163}
164
165#[cfg(feature = "experimental_enable_stop_iteration")]
166impl HeadersAccessor for EventDataStream<'_, ResponseHeaders> {
167    fn header(&self, name: &str) -> Option<String> {
168        self.exchange.host.get_http_response_header(name)
169    }
170
171    fn headers(&self) -> Vec<(String, String)> {
172        self.exchange.host.get_http_response_headers()
173    }
174
175    fn add_header(&self, name: &str, value: &str) {
176        self.exchange.host.add_http_response_header(name, value);
177    }
178
179    fn set_header(&self, name: &str, value: &str) {
180        self.exchange
181            .host
182            .set_http_response_header(name, Some(value));
183    }
184
185    fn set_headers(&self, headers: Vec<(&str, &str)>) {
186        self.exchange.host.set_http_response_headers(headers);
187    }
188
189    fn remove_header(&self, name: &str) {
190        self.exchange.host.set_http_response_header(name, None);
191    }
192}
193
194#[cfg(feature = "experimental_enable_stop_iteration")]
195impl HeadersAccessor for EventDataStream<'_, ResponseBody> {
196    fn header(&self, name: &str) -> Option<String> {
197        self.exchange.host.get_http_response_header(name)
198    }
199
200    fn headers(&self) -> Vec<(String, String)> {
201        self.exchange.host.get_http_response_headers()
202    }
203
204    fn add_header(&self, name: &str, value: &str) {
205        self.exchange.host.add_http_response_header(name, value);
206    }
207
208    fn set_header(&self, name: &str, value: &str) {
209        self.exchange
210            .host
211            .set_http_response_header(name, Some(value));
212    }
213
214    fn set_headers(&self, headers: Vec<(&str, &str)>) {
215        self.exchange.host.set_http_response_headers(headers);
216    }
217
218    fn remove_header(&self, name: &str) {
219        self.exchange.host.set_http_response_header(name, None);
220    }
221}
222
223#[derive(Clone, Debug)]
224/// Represents the state where the request trailers are available.
225pub struct RequestTrailers {
226    pub(crate) _num_trailers: usize,
227}
228
229impl HeadersEvent for RequestTrailers {}
230
231#[derive(Clone, Debug)]
232/// Represents the state where the response headers are available.
233pub struct ResponseHeaders {
234    pub(crate) _num_headers: usize,
235    pub(crate) end_of_stream: bool,
236}
237
238impl HeadersEvent for ResponseHeaders {}
239
240#[derive(Clone, Debug)]
241/// Represents the state where the response body is available.
242pub struct ResponseBody {
243    pub(crate) body_size: usize,
244    pub(crate) end_of_stream: bool,
245}
246
247impl BodyEvent for ResponseBody {}
248
249impl BodyAccessor for ResponseBody {
250    fn read_body(host: &dyn Host, offset: usize, max_size: usize) -> Option<Vec<u8>> {
251        host.get_http_response_body(offset, max_size)
252    }
253
254    fn write_body(host: &dyn Host, offset: usize, size: usize, value: &[u8]) {
255        host.set_http_response_body(offset, size, value)
256    }
257}
258
259#[derive(Clone, Debug)]
260/// Represents the state where the response trailers are available.
261pub struct ResponseTrailers {
262    pub(crate) _num_trailers: usize,
263}
264
265impl HeadersEvent for ResponseTrailers {}
266
267#[derive(Clone, Debug)]
268/// Represents the state where the request has finished processing.
269pub struct ExchangeComplete {}
270
271macro_rules! should_pause {
272    (RequestBody, $value:expr) => {
273        !$value.end_of_stream
274    };
275
276    (ResponseBody, $value:expr) => {
277        !$value.end_of_stream
278    };
279
280    ($event:ty, $value:expr) => {
281        false
282    };
283}
284
285macro_rules! end_of_stream {
286    (RequestBody, $value:expr) => {
287        $value.end_of_stream
288    };
289
290    (ResponseBody, $value:expr) => {
291        $value.end_of_stream
292    };
293
294    (RequestHeaders, $value:expr) => {
295        $value.end_of_stream
296    };
297
298    (ResponseHeaders, $value:expr) => {
299        $value.end_of_stream
300    };
301
302    ($event:ty, $value:expr) => {
303        false
304    };
305}
306
307macro_rules! body_size {
308    (RequestBody, $value:expr) => {
309        $value.body_size
310    };
311
312    (ResponseBody, $value:expr) => {
313        $value.body_size
314    };
315
316    ($event:ty, $value:expr) => {
317        0
318    };
319}
320
321// Implements After trait for an Event
322macro_rules! impl_after {
323
324    ($event:ty) => {};
325
326    ($event:ty, $($after:ty),*) => {
327        $(impl After<$after> for $event {})*
328        impl_after!($($after),*);
329    };
330}
331
332// Implements After trait for an ordered sequence of events
333macro_rules! after {
334
335    ([] $($reversed:tt)*) => {
336        impl_after!($($reversed),*); // base case
337    };
338
339    ([$first:tt $($rest:tt)*] $($reversed:tt)*) => {
340        after!([$($rest)*] $first $($reversed)*);  // recursion
341    };
342}
343
344macro_rules! finite_events {
345
346    ($($event:ident,)+) => {
347
348        #[derive(Clone, Copy, PartialEq, Eq, Debug)]
349        /// Represents the data types of the events that can be processed.
350        pub enum EventKind {
351            $($event),+
352        }
353
354        #[derive(Clone, Debug)]
355        /// Types of the events that can be processed.
356        pub enum FiniteEvent {
357            $($event($event)),+
358        }
359
360        impl FiniteEvent {
361            pub fn kind(&self) -> EventKind {
362                match self {
363                    $(Self::$event(_) => EventKind::$event),+
364                }
365            }
366
367            pub fn end_of_stream(&self) -> bool {
368                match self {
369                    $(Self::$event(e) => e.end_of_stream()),+
370                }
371            }
372        }
373
374        after!([$($event)*]);
375
376        $(
377            impl Sealed for $event {}
378
379            impl Event for $event {
380                fn kind() -> EventKind {
381                    EventKind::$event
382                }
383
384                fn should_pause(&self) -> bool {
385                    should_pause!($event, self)
386                }
387
388                fn body_size(&self) -> usize {
389                    body_size!($event, self)
390                }
391
392                fn end_of_stream(&self) -> bool {
393                    end_of_stream!($event, self)
394                }
395            }
396
397            impl From<$event> for FiniteEvent {
398                fn from(event: $event) -> Self {
399                    Self::$event(event)
400                }
401            }
402
403            impl TryFrom<FiniteEvent> for $event {
404                type Error = FiniteEvent;
405
406                fn try_from(finite_event: FiniteEvent) -> Result<Self, FiniteEvent> {
407                    match finite_event {
408                        FiniteEvent::$event(e) => Ok(e),
409                        e => Err(e),
410                    }
411                }
412            }
413        )*
414    };
415}
416
417finite_events! {
418    Start,
419    RequestHeaders,
420    RequestBody,
421    RequestTrailers,
422    ResponseHeaders,
423    ResponseBody,
424    ResponseTrailers,
425    ExchangeComplete,
426}
427
428impl PartialOrd for EventKind {
429    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
430        Some(self.cmp(other))
431    }
432}
433
434impl Ord for EventKind {
435    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
436        self.index().cmp(&other.index())
437    }
438}
439
440impl EventKind {
441    fn index(&self) -> usize {
442        *self as usize
443    }
444}
445
446struct InnerExchange<S: Event> {
447    reactor: Rc<HttpReactor>,
448    host: Rc<dyn Host>,
449    first_event: RefCell<Option<S>>,
450    empty_stream: Cell<bool>,
451    offset: Cell<usize>,
452    event_count: Cell<usize>,
453}
454
455impl<S: Event> InnerExchange<S> {
456    fn take_first_event(&self) -> Option<S> {
457        self.first_event.borrow_mut().take()
458    }
459}
460
461/// A state machine which tracks the current state of the filter.
462pub struct Exchange<S: Event = Start> {
463    pub(crate) reactor: Rc<HttpReactor>,
464    pub(crate) host: Rc<dyn Host>,
465    inner: Rc<InnerExchange<S>>,
466}
467
468/// Manages the access to the data of each event.
469pub struct EventData<'a, S: Event> {
470    exchange: Rc<InnerExchange<S>>,
471    pub(crate) event: S,
472    offset: usize,
473    _lifetime: PhantomData<&'a ()>,
474}
475
476impl<'a, S: Event> EventData<'a, S> {
477    pub(crate) fn new(exchange: &'a Exchange<S>, event: S, offset: usize) -> Self {
478        Self {
479            exchange: exchange.inner.clone(),
480            event,
481            offset,
482            _lifetime: PhantomData,
483        }
484    }
485
486    fn from_inner(exchange: Rc<InnerExchange<S>>, event: S, offset: usize) -> Self {
487        Self {
488            exchange,
489            event,
490            offset,
491            _lifetime: PhantomData,
492        }
493    }
494}
495
496/// Trait that provides access to the headers of an event.
497pub trait HeadersAccessor {
498    /// Return the first header value for the given name.
499    /// Known Limitations: The header value will be converted to an utf-8 String
500    /// If the bytes correspond to a non utf-8 string they will be parsed as an iso_8859_1 encoding.
501    fn header(&self, name: &str) -> Option<String>;
502
503    /// Returns a copy of all the headers.
504    /// Known Limitations: The header values will be converted to utf-8 Strings
505    /// If the bytes correspond to a non utf-8 string they will be parsed as an iso_8859_1 encoding.
506    fn headers(&self) -> Vec<(String, String)>;
507
508    /// Adds a new value of the header to the event.
509    fn add_header(&self, name: &str, value: &str);
510
511    /// Replaces value of the header to the event.
512    fn set_header(&self, name: &str, value: &str);
513
514    /// Replaces all the headers of the event.
515    fn set_headers(&self, headers: Vec<(&str, &str)>);
516
517    /// Removes the specified header.
518    fn remove_header(&self, name: &str);
519}
520
521impl HeadersAccessor for EventData<'_, RequestHeaders> {
522    fn header(&self, name: &str) -> Option<String> {
523        self.exchange.host.get_http_request_header(name)
524    }
525
526    fn headers(&self) -> Vec<(String, String)> {
527        self.exchange.host.get_http_request_headers()
528    }
529
530    fn add_header(&self, name: &str, value: &str) {
531        self.exchange.host.add_http_request_header(name, value);
532    }
533
534    fn set_header(&self, name: &str, value: &str) {
535        self.exchange
536            .host
537            .set_http_request_header(name, Some(value));
538    }
539
540    fn set_headers(&self, headers: Vec<(&str, &str)>) {
541        self.exchange.host.set_http_request_headers(headers);
542    }
543
544    fn remove_header(&self, name: &str) {
545        self.exchange.host.set_http_request_header(name, None);
546    }
547}
548
549impl EventData<'_, RequestTrailers> {
550    pub fn header(&self, name: &str) -> Option<String> {
551        self.exchange.host.get_http_request_trailer(name)
552    }
553
554    pub fn headers(&self) -> Vec<(String, String)> {
555        self.exchange.host.get_http_request_trailers()
556    }
557}
558
559impl HeadersAccessor for EventData<'_, ResponseHeaders> {
560    fn header(&self, name: &str) -> Option<String> {
561        self.exchange.host.get_http_response_header(name)
562    }
563
564    fn headers(&self) -> Vec<(String, String)> {
565        self.exchange.host.get_http_response_headers()
566    }
567
568    fn add_header(&self, name: &str, value: &str) {
569        self.exchange.host.add_http_response_header(name, value);
570    }
571
572    fn set_header(&self, name: &str, value: &str) {
573        self.exchange
574            .host
575            .set_http_response_header(name, Some(value));
576    }
577
578    fn set_headers(&self, headers: Vec<(&str, &str)>) {
579        self.exchange.host.set_http_response_headers(headers);
580    }
581
582    fn remove_header(&self, name: &str) {
583        self.exchange.host.set_http_response_header(name, None);
584    }
585}
586
587impl<S: Event> Exchange<S> {
588    pub(crate) fn new(
589        reactor: Rc<HttpReactor>,
590        host: Rc<dyn Host>,
591        first_event: Option<S>,
592    ) -> Self {
593        let empty_stream = first_event
594            .as_ref()
595            .map(|e| !e.should_pause())
596            .unwrap_or(false);
597        Self {
598            reactor: reactor.clone(),
599            host: host.clone(),
600            inner: Rc::new(InnerExchange {
601                reactor,
602                host,
603                first_event: RefCell::new(first_event),
604                empty_stream: Cell::new(empty_stream),
605                event_count: Cell::new(0),
606                offset: Cell::new(0),
607            }),
608        }
609    }
610
611    pub fn event_data(&self) -> Option<EventData<S>>
612    where
613        S: HeadersEvent,
614    {
615        let finite_event = self.reactor.cloned_finite_event();
616        S::try_from(finite_event)
617            .ok()
618            .map(|e| EventData::new(self, e, 0))
619    }
620
621    #[must_use]
622    pub fn event_data_stream(&self) -> EventDataStream<S> {
623        EventDataStream {
624            id_and_waker: None,
625            exchange: self.inner.clone(),
626            buffering: true,
627            _lifetime: PhantomData,
628        }
629    }
630
631    pub(crate) fn static_event_data_stream(&self) -> EventDataStream<'static, S>
632    where
633        S: BodyEvent,
634    {
635        EventDataStream {
636            id_and_waker: None,
637            exchange: self.inner.clone(),
638            buffering: false,
639            _lifetime: PhantomData,
640        }
641    }
642
643    pub(crate) async fn wait_for_event<E>(self) -> Exchange<E>
644    where
645        E: Event,
646        S: Before<E>,
647    {
648        self.wait_for_event_buffering(true).await
649    }
650
651    pub(crate) async fn wait_for_event_buffering<E>(self, buffering: bool) -> Exchange<E>
652    where
653        E: Event,
654        S: Before<E>,
655    {
656        let exchange: Exchange<E> =
657            Exchange::new(Rc::clone(&self.reactor), Rc::clone(&self.host), None);
658
659        // Ensure flow resume
660        drop(self);
661
662        let mut stream = EventDataStream {
663            id_and_waker: None,
664            exchange: exchange.inner.clone(),
665            buffering,
666            _lifetime: PhantomData,
667        };
668        let first_event = stream.next().await.map(|ed| ed.event);
669
670        *exchange.inner.first_event.borrow_mut() = first_event;
671        exchange.inner.offset.set(0);
672        exchange.inner.empty_stream.set(false);
673
674        exchange
675    }
676
677    pub async fn wait_for_request_headers(self) -> Exchange<RequestHeaders>
678    where
679        S: Before<RequestHeaders>,
680    {
681        self.wait_for_event().await
682    }
683
684    pub async fn wait_for_request_body(self) -> Exchange<RequestBody>
685    where
686        S: Before<RequestBody>,
687    {
688        self.wait_for_event().await
689    }
690
691    pub(crate) async fn _wait_for_request_trailers(self) -> Exchange<RequestTrailers>
692    where
693        S: Before<RequestTrailers>,
694    {
695        self.wait_for_event().await
696    }
697
698    pub async fn wait_for_response_headers(self) -> Exchange<ResponseHeaders>
699    where
700        S: Before<ResponseHeaders>,
701    {
702        self.wait_for_event().await
703    }
704
705    pub async fn wait_for_response_body(self) -> Exchange<ResponseBody>
706    where
707        S: Before<ResponseBody>,
708    {
709        self.wait_for_event().await
710    }
711
712    pub(crate) async fn _wait_for_response_trailers(self) -> Exchange<ResponseTrailers>
713    where
714        S: Before<ResponseTrailers>,
715    {
716        self.wait_for_event().await
717    }
718
719    pub(crate) async fn _wait_for_exchange_complete(self) -> Exchange<ExchangeComplete>
720    where
721        S: Before<ExchangeComplete>,
722    {
723        self.wait_for_event().await
724    }
725
726    pub fn send_response(self, status_code: u32, headers: Vec<(&str, &str)>, body: Option<&[u8]>)
727    where
728        S: After<Start> + Before<ResponseBody>,
729    {
730        self.host
731            .set_effective_context(self.reactor.context_id().into());
732        self.reactor.set_paused(true);
733        self.reactor.cancel_request();
734        self.host.send_http_response(status_code, headers, body);
735    }
736
737    // Avoid resuming if stop iteration changed the header state to the body state
738    fn should_resume(&self, current: EventKind, next: EventKind) -> bool {
739        let feature_enabled = cfg!(feature = "experimental_enable_stop_iteration");
740        let current_match = self.is_event(current);
741        let next_match = next == self.reactor.current_event();
742
743        !feature_enabled || !current_match || !next_match
744    }
745
746    fn should_resume_request(&self) -> bool {
747        self.should_resume(EventKind::RequestHeaders, EventKind::RequestBody)
748    }
749
750    fn should_resume_response(&self) -> bool {
751        self.should_resume(EventKind::ResponseHeaders, EventKind::ResponseBody)
752    }
753
754    fn is_event(&self, kind: EventKind) -> bool {
755        self.inner
756            .first_event
757            .borrow()
758            .as_ref()
759            .map(|d| d.event_kind() == kind)
760            .unwrap_or(false)
761    }
762}
763
764impl<S: Event> Drop for Exchange<S> {
765    fn drop(&mut self) {
766        let reactor = &self.reactor;
767        let host = &self.host;
768        if !reactor.is_done()
769            && reactor.paused()
770            && ((!reactor.cancelled_request() && reactor.phase() == ExchangePhase::Request)
771                || (!reactor.cancelled_response() && reactor.phase() == ExchangePhase::Response))
772        {
773            match reactor.phase() {
774                ExchangePhase::Request => {
775                    if self.should_resume_request() {
776                        reactor.set_paused(false);
777                        reactor.set_eos_paused(false);
778                        host.set_effective_context(reactor.context_id().into());
779                        host.resume_http_request()
780                    }
781                }
782                ExchangePhase::Response => {
783                    if self.should_resume_response() {
784                        reactor.set_paused(false);
785                        reactor.set_eos_paused(false);
786                        host.set_effective_context(reactor.context_id().into());
787                        host.resume_http_response()
788                    }
789                }
790            }
791        }
792    }
793}
794
795impl<S> EventData<'_, S>
796where
797    S: BodyEvent,
798{
799    pub fn offset(&self) -> usize {
800        self.offset
801    }
802
803    pub fn chunk_size(&self) -> usize {
804        self.event.body_size()
805    }
806
807    pub fn read_body(&self, offset: usize, max_size: usize) -> Vec<u8> {
808        S::read_body(self.exchange.host.deref(), offset, max_size).unwrap_or_default()
809    }
810
811    pub fn read_chunk(&self) -> Vec<u8> {
812        self.read_body(self.offset, self.event.body_size())
813    }
814
815    pub fn read_payload(&self) -> Vec<u8> {
816        self.read_body(0, self.event.body_size())
817    }
818}
819
820/// Manages the access to the data for streamed events.
821pub struct EventDataStream<'e, S: Event> {
822    exchange: Rc<InnerExchange<S>>,
823    id_and_waker: Option<(WakerId, Waker)>,
824    buffering: bool,
825    _lifetime: PhantomData<&'e ()>,
826}
827
828impl<'e, S: Event> EventDataStream<'e, S> {
829    fn process_event(&mut self, event: S) -> EventData<'e, S> {
830        let exchange = self.exchange.clone();
831        let reactor = exchange.reactor.as_ref();
832
833        let pause = self.buffering && event.should_pause() || reactor.eos_paused();
834
835        reactor.set_paused(pause);
836        let offset = exchange.offset.get();
837        exchange.offset.set(event.body_size());
838        exchange.empty_stream.set(!event.should_pause());
839        EventData::from_inner(exchange, event, offset)
840    }
841}
842
843impl<'e, S: Event> Stream for EventDataStream<'e, S> {
844    type Item = EventData<'e, S>;
845
846    fn poll_next(
847        mut self: std::pin::Pin<&mut Self>,
848        cx: &mut std::task::Context<'_>,
849    ) -> Poll<Option<Self::Item>> {
850        let reactor = self.exchange.reactor.clone();
851        let exchange = self.exchange.clone();
852
853        if exchange.empty_stream.get() {
854            if let Some((id, _)) = self.id_and_waker.take() {
855                // Deregister the waker from the reactor.
856                reactor.remove_waker(S::kind(), id);
857            }
858            return Poll::Ready(None);
859        }
860
861        if reactor.current_event() >= S::kind() {
862            let event_data = if let Some(event) = exchange.take_first_event() {
863                if let Some((id, _)) = self.id_and_waker.take() {
864                    // Deregister the waker from the reactor.
865                    reactor.remove_waker(S::kind(), id);
866                }
867                Some(self.process_event(event))
868            } else {
869                let event_count = reactor.event_count();
870
871                // Ensure that repeated events are not the same
872                if event_count > exchange.event_count.get() {
873                    exchange.event_count.set(event_count);
874                    if let Ok(event) = S::try_from(reactor.cloned_finite_event()) {
875                        Some(self.process_event(event))
876                    } else {
877                        None
878                    }
879                } else {
880                    return Poll::Pending;
881                }
882            };
883
884            Poll::Ready(event_data)
885        } else {
886            match &self.id_and_waker {
887                None => {
888                    // Register the waker in the reactor.
889                    let id = reactor.insert_waker(S::kind(), cx.waker().clone());
890                    self.id_and_waker = Some((id, cx.waker().clone()));
891                }
892                Some((id, w)) if !w.will_wake(cx.waker()) => {
893                    // Deregister the waker from the reactor to remove the old waker.
894                    reactor.remove_waker(S::kind(), *id);
895
896                    // Register the waker in the reactor with the new waker.
897                    let id = reactor.insert_waker(S::kind(), cx.waker().clone());
898                    self.id_and_waker = Some((id, cx.waker().clone()));
899                }
900                Some(_) => {}
901            }
902            Poll::Pending
903        }
904    }
905}