pdk_classy/
event.rs

1// Copyright (c) 2025, Salesforce, Inc.,
2// All rights reserved.
3// For full license text, see the LICENSE.txt file
4
5//! Abstractions of the proxy wasm underlying events.
6
7use std::{
8    cell::{Cell, RefCell},
9    convert::TryFrom,
10    marker::PhantomData,
11    ops::Deref,
12    rc::Rc,
13    task::{Poll, Waker},
14};
15
16use futures::{Stream, StreamExt};
17
18use crate::{
19    host::Host,
20    reactor::http::{HttpReactor, WakerId},
21    types::HttpCid,
22};
23use crate::{
24    http_constants::{
25        DEFAULT_PATH, HEADER_AUTHORITY, HEADER_METHOD, HEADER_PATH, HEADER_SCHEME, HEADER_STATUS,
26    },
27    reactor::http::ExchangePhase,
28};
29
30mod private {
31    use crate::host::Host;
32
33    pub trait Sealed {}
34
35    pub trait BodyAccessor {
36        fn read_body(host: &dyn Host, offset: usize, max_size: usize) -> Option<Vec<u8>>;
37
38        fn write_body(host: &dyn Host, offset: usize, size: usize, value: &[u8]);
39    }
40}
41
42use private::{BodyAccessor, Sealed};
43
44/// Common interface that all underlying events share.
45pub trait Event: Sealed + Clone + Into<FiniteEvent> + TryFrom<FiniteEvent> + Unpin {
46    fn kind() -> EventKind;
47
48    fn body_size(&self) -> usize;
49
50    fn should_pause(&self) -> bool;
51}
52
53/// Imposes an order of succession between two Events.
54pub trait After<S: Event>: Event {}
55
56/// Imposes an order of precedence between two Events.
57pub trait Before<S: Event>: Event {}
58
59impl<A, B> Before<B> for A
60where
61    B: After<A>,
62    A: Event,
63{
64}
65
66/// Events that can access the body and verify if more body remains.
67pub trait BodyEvent: Event + BodyAccessor {
68    fn end_of_stream(&self) -> bool;
69}
70
71/// Events that can access the headers.
72pub trait HeadersEvent: Event {}
73
74/// Alias name for CreateContext event
75#[derive(Clone, Debug)]
76pub struct Start {
77    pub(crate) _context_id: HttpCid,
78}
79
80#[derive(Clone, Debug)]
81/// Represents the state where the request headers are available.
82pub struct RequestHeaders {
83    pub(crate) _num_headers: usize,
84    pub(crate) end_of_stream: bool,
85}
86
87impl HeadersEvent for RequestHeaders {}
88
89#[derive(Clone, Debug)]
90/// Represents the state where the request body is available.
91pub struct RequestBody {
92    pub(crate) body_size: usize,
93    pub(crate) end_of_stream: bool,
94}
95
96impl BodyEvent for RequestBody {
97    fn end_of_stream(&self) -> bool {
98        self.end_of_stream
99    }
100}
101
102impl BodyAccessor for RequestBody {
103    fn read_body(host: &dyn Host, offset: usize, max_size: usize) -> Option<Vec<u8>> {
104        host.get_http_request_body(offset, max_size)
105    }
106
107    fn write_body(host: &dyn Host, offset: usize, size: usize, value: &[u8]) {
108        host.set_http_request_body(offset, size, value)
109    }
110}
111
112#[cfg(feature = "experimental_enable_stop_iteration")]
113impl HeadersAccessor for EventDataStream<'_, RequestBody> {
114    fn header(&self, name: &str) -> Option<String> {
115        self.exchange.host.get_http_request_header(name)
116    }
117
118    fn headers(&self) -> Vec<(String, String)> {
119        self.exchange.host.get_http_request_headers()
120    }
121
122    fn add_header(&self, name: &str, value: &str) {
123        self.exchange.host.add_http_request_header(name, value);
124    }
125
126    fn set_header(&self, name: &str, value: &str) {
127        self.exchange
128            .host
129            .set_http_request_header(name, Some(value));
130    }
131
132    fn set_headers(&self, headers: Vec<(&str, &str)>) {
133        self.exchange.host.set_http_request_headers(headers);
134    }
135
136    fn remove_header(&self, name: &str) {
137        self.exchange.host.set_http_request_header(name, None);
138    }
139}
140
141#[cfg(feature = "experimental_enable_stop_iteration")]
142impl HeadersAccessor for EventDataStream<'_, ResponseBody> {
143    fn header(&self, name: &str) -> Option<String> {
144        self.exchange.host.get_http_response_header(name)
145    }
146
147    fn headers(&self) -> Vec<(String, String)> {
148        self.exchange.host.get_http_response_headers()
149    }
150
151    fn add_header(&self, name: &str, value: &str) {
152        self.exchange.host.add_http_response_header(name, value);
153    }
154
155    fn set_header(&self, name: &str, value: &str) {
156        self.exchange
157            .host
158            .set_http_response_header(name, Some(value));
159    }
160
161    fn set_headers(&self, headers: Vec<(&str, &str)>) {
162        self.exchange.host.set_http_response_headers(headers);
163    }
164
165    fn remove_header(&self, name: &str) {
166        self.exchange.host.set_http_response_header(name, None);
167    }
168}
169
170#[derive(Clone, Debug)]
171/// Represents the state where the request trailers are available.
172pub struct RequestTrailers {
173    pub(crate) _num_trailers: usize,
174}
175
176impl HeadersEvent for RequestTrailers {}
177
178#[derive(Clone, Debug)]
179/// Represents the state where the response headers are available.
180pub struct ResponseHeaders {
181    pub(crate) _num_headers: usize,
182    pub(crate) end_of_stream: bool,
183}
184
185impl HeadersEvent for ResponseHeaders {}
186
187#[derive(Clone, Debug)]
188/// Represents the state where the response body is available.
189pub struct ResponseBody {
190    pub(crate) body_size: usize,
191    pub(crate) end_of_stream: bool,
192}
193
194impl BodyEvent for ResponseBody {
195    fn end_of_stream(&self) -> bool {
196        self.end_of_stream
197    }
198}
199
200impl BodyAccessor for ResponseBody {
201    fn read_body(host: &dyn Host, offset: usize, max_size: usize) -> Option<Vec<u8>> {
202        host.get_http_response_body(offset, max_size)
203    }
204
205    fn write_body(host: &dyn Host, offset: usize, size: usize, value: &[u8]) {
206        host.set_http_response_body(offset, size, value)
207    }
208}
209
210#[derive(Clone, Debug)]
211/// Represents the state where the response trailers are available.
212pub struct ResponseTrailers {
213    pub(crate) _num_trailers: usize,
214}
215
216impl HeadersEvent for ResponseTrailers {}
217
218#[derive(Clone, Debug)]
219/// Represents the state where the request has finished processing.
220pub struct ExchangeComplete {}
221
222macro_rules! should_pause {
223    (RequestBody, $value:expr) => {
224        !$value.end_of_stream
225    };
226
227    (ResponseBody, $value:expr) => {
228        !$value.end_of_stream
229    };
230
231    ($event:ty, $value:expr) => {
232        false
233    };
234}
235
236macro_rules! body_size {
237    (RequestBody, $value:expr) => {
238        $value.body_size
239    };
240
241    (ResponseBody, $value:expr) => {
242        $value.body_size
243    };
244
245    ($event:ty, $value:expr) => {
246        0
247    };
248}
249
250// Implements After trait for an Event
251macro_rules! impl_after {
252
253    ($event:ty) => {};
254
255    ($event:ty, $($after:ty),*) => {
256        $(impl After<$after> for $event {})*
257        impl_after!($($after),*);
258    };
259}
260
261// Implements After trait for an ordered sequence of events
262macro_rules! after {
263
264    ([] $($reversed:tt)*) => {
265        impl_after!($($reversed),*); // base case
266    };
267
268    ([$first:tt $($rest:tt)*] $($reversed:tt)*) => {
269        after!([$($rest)*] $first $($reversed)*);  // recursion
270    };
271}
272
273macro_rules! finite_events {
274
275    ($($event:ident,)+) => {
276
277        #[derive(Clone, Copy, PartialEq, Eq, Debug)]
278        /// Represents the data types of the events that can be processed.
279        pub enum EventKind {
280            $($event),+
281        }
282
283        #[derive(Clone, Debug)]
284        /// Types of the events that can be processed.
285        pub enum FiniteEvent {
286            $($event($event)),+
287        }
288
289        impl FiniteEvent {
290            pub fn kind(&self) -> EventKind {
291                match self {
292                    $(Self::$event(_) => EventKind::$event),+
293                }
294            }
295        }
296
297        after!([$($event)*]);
298
299        $(
300            impl Sealed for $event {}
301
302            impl Event for $event {
303                fn kind() -> EventKind {
304                    EventKind::$event
305                }
306
307                fn should_pause(&self) -> bool {
308                    should_pause!($event, self)
309                }
310
311                fn body_size(&self) -> usize {
312                    body_size!($event, self)
313                }
314            }
315
316            impl From<$event> for FiniteEvent {
317                fn from(event: $event) -> Self {
318                    Self::$event(event)
319                }
320            }
321
322            impl TryFrom<FiniteEvent> for $event {
323                type Error = FiniteEvent;
324
325                fn try_from(finite_event: FiniteEvent) -> Result<Self, FiniteEvent> {
326                    match finite_event {
327                        FiniteEvent::$event(e) => Ok(e),
328                        e => Err(e),
329                    }
330                }
331            }
332        )*
333    };
334}
335
336finite_events! {
337    Start,
338    RequestHeaders,
339    RequestBody,
340    RequestTrailers,
341    ResponseHeaders,
342    ResponseBody,
343    ResponseTrailers,
344    ExchangeComplete,
345}
346
347impl PartialOrd for EventKind {
348    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
349        Some(self.cmp(other))
350    }
351}
352
353impl Ord for EventKind {
354    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
355        self.index().cmp(&other.index())
356    }
357}
358
359impl EventKind {
360    fn index(&self) -> usize {
361        *self as usize
362    }
363}
364
365struct InnerExchange<S: Event> {
366    reactor: Rc<HttpReactor>,
367    host: Rc<dyn Host>,
368    first_event: RefCell<Option<S>>,
369    empty_stream: Cell<bool>,
370    offset: Cell<usize>,
371    event_count: Cell<usize>,
372}
373
374impl<S: Event> InnerExchange<S> {
375    fn take_first_event(&self) -> Option<S> {
376        self.first_event.borrow_mut().take()
377    }
378}
379
380/// A state machine which tracks the current state of the filter.
381pub struct Exchange<S: Event = Start> {
382    pub(crate) reactor: Rc<HttpReactor>,
383    pub(crate) host: Rc<dyn Host>,
384    inner: Rc<InnerExchange<S>>,
385}
386
387/// Manages the access to the data of each event.
388pub struct EventData<'a, S: Event> {
389    exchange: Rc<InnerExchange<S>>,
390    pub(crate) event: S,
391    offset: usize,
392    _lifetime: PhantomData<&'a ()>,
393}
394
395impl<'a, S: Event> EventData<'a, S> {
396    pub(crate) fn new(exchange: &'a Exchange<S>, event: S, offset: usize) -> Self {
397        Self {
398            exchange: exchange.inner.clone(),
399            event,
400            offset,
401            _lifetime: PhantomData,
402        }
403    }
404
405    fn from_inner(exchange: Rc<InnerExchange<S>>, event: S, offset: usize) -> Self {
406        Self {
407            exchange,
408            event,
409            offset,
410            _lifetime: PhantomData,
411        }
412    }
413}
414
415/// Trait that provides access to the headers of an event.
416pub trait HeadersAccessor {
417    /// Return the first header value for the given name.
418    /// Known Limitations: The header value will be converted to an utf-8 String
419    /// If the bytes correspond to a non utf-8 string they will be parsed as an iso_8859_1 encoding.
420    fn header(&self, name: &str) -> Option<String>;
421
422    /// Returns a copy of all the headers.
423    /// Known Limitations: The header values will be converted to utf-8 Strings
424    /// If the bytes correspond to a non utf-8 string they will be parsed as an iso_8859_1 encoding.
425    fn headers(&self) -> Vec<(String, String)>;
426
427    /// Adds a new value of the header to the event.
428    fn add_header(&self, name: &str, value: &str);
429
430    /// Replaces value of the header to the event.
431    fn set_header(&self, name: &str, value: &str);
432
433    /// Replaces all the headers of the event.
434    fn set_headers(&self, headers: Vec<(&str, &str)>);
435
436    /// Removes the specified header.
437    fn remove_header(&self, name: &str);
438}
439
440impl EventData<'_, RequestHeaders> {
441    pub fn method(&self) -> String {
442        self.header(HEADER_METHOD).unwrap_or_default()
443    }
444
445    pub fn scheme(&self) -> String {
446        self.header(HEADER_SCHEME).unwrap_or_default()
447    }
448
449    pub fn authority(&self) -> String {
450        self.header(HEADER_AUTHORITY).unwrap_or_default()
451    }
452
453    pub fn path(&self) -> String {
454        self.header(HEADER_PATH)
455            .unwrap_or_else(|| DEFAULT_PATH.to_string())
456    }
457}
458
459impl EventData<'_, ResponseHeaders> {
460    pub fn status_code(&self) -> u32 {
461        self.header(HEADER_STATUS)
462            .and_then(|status| status.parse::<u32>().ok())
463            .unwrap_or_default()
464    }
465}
466
467impl HeadersAccessor for EventData<'_, RequestHeaders> {
468    fn header(&self, name: &str) -> Option<String> {
469        self.exchange.host.get_http_request_header(name)
470    }
471
472    fn headers(&self) -> Vec<(String, String)> {
473        self.exchange.host.get_http_request_headers()
474    }
475
476    fn add_header(&self, name: &str, value: &str) {
477        self.exchange.host.add_http_request_header(name, value);
478    }
479
480    fn set_header(&self, name: &str, value: &str) {
481        self.exchange
482            .host
483            .set_http_request_header(name, Some(value));
484    }
485
486    fn set_headers(&self, headers: Vec<(&str, &str)>) {
487        self.exchange.host.set_http_request_headers(headers);
488    }
489
490    fn remove_header(&self, name: &str) {
491        self.exchange.host.set_http_request_header(name, None);
492    }
493}
494
495impl EventData<'_, RequestTrailers> {
496    pub fn header(&self, name: &str) -> Option<String> {
497        self.exchange.host.get_http_request_trailer(name)
498    }
499
500    pub fn headers(&self) -> Vec<(String, String)> {
501        self.exchange.host.get_http_request_trailers()
502    }
503}
504
505impl HeadersAccessor for EventData<'_, ResponseHeaders> {
506    fn header(&self, name: &str) -> Option<String> {
507        self.exchange.host.get_http_response_header(name)
508    }
509
510    fn headers(&self) -> Vec<(String, String)> {
511        self.exchange.host.get_http_response_headers()
512    }
513
514    fn add_header(&self, name: &str, value: &str) {
515        self.exchange.host.add_http_response_header(name, value);
516    }
517
518    fn set_header(&self, name: &str, value: &str) {
519        self.exchange
520            .host
521            .set_http_response_header(name, Some(value));
522    }
523
524    fn set_headers(&self, headers: Vec<(&str, &str)>) {
525        self.exchange.host.set_http_response_headers(headers);
526    }
527
528    fn remove_header(&self, name: &str) {
529        self.exchange.host.set_http_response_header(name, None);
530    }
531}
532
533impl<S: Event> Exchange<S> {
534    pub(crate) fn new(
535        reactor: Rc<HttpReactor>,
536        host: Rc<dyn Host>,
537        first_event: Option<S>,
538    ) -> Self {
539        let empty_stream = first_event
540            .as_ref()
541            .map(|e| !e.should_pause())
542            .unwrap_or(false);
543        Self {
544            reactor: reactor.clone(),
545            host: host.clone(),
546            inner: Rc::new(InnerExchange {
547                reactor,
548                host,
549                first_event: RefCell::new(first_event),
550                empty_stream: Cell::new(empty_stream),
551                event_count: Cell::new(0),
552                offset: Cell::new(0),
553            }),
554        }
555    }
556
557    pub fn event_data(&self) -> Option<EventData<S>>
558    where
559        S: HeadersEvent,
560    {
561        let finite_event = self.reactor.cloned_finite_event();
562        S::try_from(finite_event)
563            .ok()
564            .map(|e| EventData::new(self, e, 0))
565    }
566
567    #[must_use]
568    pub fn event_data_stream(&self) -> EventDataStream<S>
569    where
570        S: BodyEvent,
571    {
572        EventDataStream {
573            id_and_waker: None,
574            exchange: self.inner.clone(),
575            buffering: true,
576            _lifetime: PhantomData,
577        }
578    }
579
580    pub(crate) fn static_event_data_stream(&self) -> EventDataStream<'static, S>
581    where
582        S: BodyEvent,
583    {
584        EventDataStream {
585            id_and_waker: None,
586            exchange: self.inner.clone(),
587            buffering: false,
588            _lifetime: PhantomData,
589        }
590    }
591
592    pub(crate) async fn wait_for_event<E>(self) -> Exchange<E>
593    where
594        E: Event,
595        S: Before<E>,
596    {
597        self.wait_for_event_buffering(true).await
598    }
599
600    pub(crate) async fn wait_for_event_buffering<E>(self, buffering: bool) -> Exchange<E>
601    where
602        E: Event,
603        S: Before<E>,
604    {
605        let exchange: Exchange<E> =
606            Exchange::new(Rc::clone(&self.reactor), Rc::clone(&self.host), None);
607
608        // Ensure flow resume
609        drop(self);
610
611        let mut stream = EventDataStream {
612            id_and_waker: None,
613            exchange: exchange.inner.clone(),
614            buffering,
615            _lifetime: PhantomData,
616        };
617        let first_event = stream.next().await.map(|ed| ed.event);
618
619        *exchange.inner.first_event.borrow_mut() = first_event;
620        exchange.inner.offset.set(0);
621        exchange.inner.empty_stream.set(false);
622
623        exchange
624    }
625
626    pub async fn wait_for_request_headers(self) -> Exchange<RequestHeaders>
627    where
628        S: Before<RequestHeaders>,
629    {
630        self.wait_for_event().await
631    }
632
633    pub async fn wait_for_request_body(self) -> Exchange<RequestBody>
634    where
635        S: Before<RequestBody>,
636    {
637        self.wait_for_event().await
638    }
639
640    pub(crate) async fn _wait_for_request_trailers(self) -> Exchange<RequestTrailers>
641    where
642        S: Before<RequestTrailers>,
643    {
644        self.wait_for_event().await
645    }
646
647    pub async fn wait_for_response_headers(self) -> Exchange<ResponseHeaders>
648    where
649        S: Before<ResponseHeaders>,
650    {
651        self.wait_for_event().await
652    }
653
654    pub async fn wait_for_response_body(self) -> Exchange<ResponseBody>
655    where
656        S: Before<ResponseBody>,
657    {
658        self.wait_for_event().await
659    }
660
661    pub(crate) async fn _wait_for_response_trailers(self) -> Exchange<ResponseTrailers>
662    where
663        S: Before<ResponseTrailers>,
664    {
665        self.wait_for_event().await
666    }
667
668    pub(crate) async fn _wait_for_exchange_complete(self) -> Exchange<ExchangeComplete>
669    where
670        S: Before<ExchangeComplete>,
671    {
672        self.wait_for_event().await
673    }
674
675    pub fn send_response(self, status_code: u32, headers: Vec<(&str, &str)>, body: Option<&[u8]>)
676    where
677        S: After<Start> + Before<ResponseHeaders>,
678    {
679        self.host
680            .set_effective_context(self.reactor.context_id().into());
681        self.reactor.set_paused(true);
682        self.reactor.cancel_request();
683        self.host.send_http_response(status_code, headers, body);
684    }
685}
686
687impl<S: Event> Drop for Exchange<S> {
688    fn drop(&mut self) {
689        let reactor = &self.reactor;
690        let host = &self.host;
691        if !reactor.is_done() && reactor.paused() && !reactor.cancelled_request() {
692            reactor.set_paused(false);
693
694            host.set_effective_context(reactor.context_id().into());
695
696            match reactor.phase() {
697                ExchangePhase::Request => host.resume_http_request(),
698                ExchangePhase::Response => host.resume_http_response(),
699            }
700        }
701    }
702}
703
704impl<S> EventData<'_, S>
705where
706    S: BodyEvent,
707{
708    pub fn offset(&self) -> usize {
709        self.offset
710    }
711
712    pub fn chunk_size(&self) -> usize {
713        self.event.body_size()
714    }
715
716    pub fn read_body(&self, offset: usize, max_size: usize) -> Vec<u8> {
717        S::read_body(self.exchange.host.deref(), offset, max_size).unwrap_or_default()
718    }
719
720    pub fn read_chunk(&self) -> Vec<u8> {
721        self.read_body(self.offset, self.event.body_size())
722    }
723
724    pub fn read_payload(&self) -> Vec<u8> {
725        self.read_body(0, self.event.body_size())
726    }
727}
728
729/// Manages the access to the data for streamed events.
730pub struct EventDataStream<'e, S: Event> {
731    exchange: Rc<InnerExchange<S>>,
732    id_and_waker: Option<(WakerId, Waker)>,
733    buffering: bool,
734    _lifetime: PhantomData<&'e ()>,
735}
736
737impl<'e, S: Event> EventDataStream<'e, S> {
738    fn process_event(&mut self, event: S) -> EventData<'e, S> {
739        let exchange = self.exchange.clone();
740        let reactor = exchange.reactor.as_ref();
741        let pause = self.buffering && event.should_pause();
742        reactor.set_paused(pause);
743        let offset = exchange.offset.get();
744        exchange.offset.set(event.body_size());
745        exchange.empty_stream.set(!event.should_pause());
746        EventData::from_inner(exchange, event, offset)
747    }
748}
749
750impl<'e, S: Event> Stream for EventDataStream<'e, S> {
751    type Item = EventData<'e, S>;
752
753    fn poll_next(
754        mut self: std::pin::Pin<&mut Self>,
755        cx: &mut std::task::Context<'_>,
756    ) -> Poll<Option<Self::Item>> {
757        let reactor = self.exchange.reactor.clone();
758        let exchange = self.exchange.clone();
759
760        if exchange.empty_stream.get() {
761            if let Some((id, _)) = self.id_and_waker.take() {
762                // Deregister the waker from the reactor.
763                reactor.remove_waker(S::kind(), id);
764            }
765            return Poll::Ready(None);
766        }
767
768        if reactor.current_event() >= S::kind() {
769            let event_data = if let Some(event) = exchange.take_first_event() {
770                if let Some((id, _)) = self.id_and_waker.take() {
771                    // Deregister the waker from the reactor.
772                    reactor.remove_waker(S::kind(), id);
773                }
774                Some(self.process_event(event))
775            } else {
776                let event_count = reactor.event_count();
777
778                // Ensure that repeated events are not the same
779                if event_count > exchange.event_count.get() {
780                    exchange.event_count.set(event_count);
781                    if let Ok(event) = S::try_from(reactor.cloned_finite_event()) {
782                        Some(self.process_event(event))
783                    } else {
784                        None
785                    }
786                } else {
787                    return Poll::Pending;
788                }
789            };
790
791            Poll::Ready(event_data)
792        } else {
793            match &self.id_and_waker {
794                None => {
795                    // Register the waker in the reactor.
796                    let id = reactor.insert_waker(S::kind(), cx.waker().clone());
797                    self.id_and_waker = Some((id, cx.waker().clone()));
798                }
799                Some((id, w)) if !w.will_wake(cx.waker()) => {
800                    // Deregister the waker from the reactor to remove the old waker.
801                    reactor.remove_waker(S::kind(), *id);
802
803                    // Register the waker in the reactor with the new waker.
804                    let id = reactor.insert_waker(S::kind(), cx.waker().clone());
805                    self.id_and_waker = Some((id, cx.waker().clone()));
806                }
807                Some(_) => {}
808            }
809            Poll::Pending
810        }
811    }
812}