proxy_sdk/
dispatcher.rs

1#![allow(clippy::type_complexity)]
2
3use log::{debug, error, warn};
4
5use crate::{
6    check_concern,
7    context::{Context, RootContext},
8    downcast_box::DowncastBox,
9    grpc_call::GrpcCallResponse,
10    grpc_stream::{GrpcStreamClose, GrpcStreamHandle, GrpcStreamMessage},
11    hostcalls::{self, BufferType},
12    http::{
13        HttpContext, RequestBody, RequestHeaders, RequestTrailers, ResponseBody, ResponseHeaders,
14        ResponseTrailers,
15    },
16    http_call::HttpCallResponse,
17    property::envoy::Attributes,
18    queue::Queue,
19    stream::{DownstreamData, StreamClose, StreamContext, UpstreamData},
20    CloseType, FilterDataStatus, FilterHeadersStatus, FilterStreamStatus, FilterTrailersStatus,
21    GrpcCode,
22};
23use std::{
24    cell::{Cell, RefCell, RefMut},
25    collections::{hash_map::Entry, HashMap},
26    sync::{
27        atomic::{AtomicUsize, Ordering},
28        Mutex,
29    },
30};
31
32#[cfg(feature = "stream-metadata")]
33pub use crate::grpc_stream::{GrpcStreamInitialMetadata, GrpcStreamTrailingMetadata};
34
35thread_local! {
36    static DISPATCHER: Dispatcher = Dispatcher::new();
37}
38static DISPATCHER_GEN: AtomicUsize = AtomicUsize::new(0);
39
40pub(crate) fn reset() {
41    DISPATCHER_GEN.fetch_add(1, Ordering::Relaxed);
42    *ROOT_INIT.lock().unwrap() = None;
43}
44
45pub(crate) fn root_id() -> u32 {
46    DISPATCHER.with(|x| x.active_root_id.get())
47}
48
49fn dispatch<F, R>(f: F) -> R
50where
51    F: FnOnce(&Dispatcher) -> R,
52{
53    DISPATCHER.with(|d| {
54        let current_gen = DISPATCHER_GEN.load(Ordering::Relaxed);
55        if d.generation.get() != current_gen {
56            d.generation.set(current_gen);
57            d.reset();
58        }
59        f(d)
60    })
61}
62
63static ROOT_INIT: Mutex<Option<Box<dyn Fn() -> DowncastBox<dyn RootContext> + Send + Sync>>> =
64    Mutex::new(None);
65
66struct HttpCallback {
67    context_id: u32,
68    root_context_id: u32,
69    callback: Box<dyn FnOnce(&mut DowncastBox<dyn RootContext>, &HttpCallResponse)>,
70}
71
72struct GrpcCallback {
73    context_id: u32,
74    root_context_id: u32,
75    callback: Box<dyn FnOnce(&mut DowncastBox<dyn RootContext>, &GrpcCallResponse)>,
76}
77
78#[derive(Default)]
79struct GrpcStreamCallback {
80    context_id: u32,
81    root_context_id: u32,
82    close: Option<Box<dyn FnOnce(&mut DowncastBox<dyn RootContext>, &GrpcStreamClose)>>,
83    message: Option<
84        Box<dyn FnMut(&mut DowncastBox<dyn RootContext>, GrpcStreamHandle, &GrpcStreamMessage)>,
85    >,
86    #[cfg(feature = "stream-metadata")]
87    initial_meta: Option<
88        Box<
89            dyn FnMut(
90                &mut DowncastBox<dyn RootContext>,
91                GrpcStreamHandle,
92                &GrpcStreamInitialMetadata,
93            ),
94        >,
95    >,
96    #[cfg(feature = "stream-metadata")]
97    trailer_meta: Option<
98        Box<
99            dyn FnMut(
100                &mut DowncastBox<dyn RootContext>,
101                GrpcStreamHandle,
102                &GrpcStreamTrailingMetadata,
103            ),
104        >,
105    >,
106}
107
108struct StreamInfo {
109    parent_context_id: u32,
110    data: Box<dyn StreamContext>,
111}
112
113struct HttpStreamInfo {
114    parent_context_id: u32,
115    data: Box<dyn HttpContext>,
116}
117
118struct RootInfo {
119    data: DowncastBox<dyn RootContext>,
120}
121
122#[derive(Default)]
123struct Dispatcher {
124    roots: RefCell<HashMap<u32, RootInfo>>,
125    streams: RefCell<HashMap<u32, StreamInfo>>,
126    http_streams: RefCell<HashMap<u32, HttpStreamInfo>>,
127    http_callbacks: RefCell<HashMap<u32, HttpCallback>>,
128    grpc_callbacks: RefCell<HashMap<u32, GrpcCallback>>,
129    grpc_streams: RefCell<HashMap<u32, GrpcStreamCallback>>,
130    queue_callbacks:
131        RefCell<HashMap<u32, Box<dyn FnMut(&mut DowncastBox<dyn RootContext>, Queue)>>>,
132    active_id: Cell<u32>,
133    active_root_id: Cell<u32>,
134    generation: Cell<usize>,
135}
136
137impl Dispatcher {
138    fn reset(&self) {
139        self.roots.borrow_mut().clear();
140        self.streams.borrow_mut().clear();
141        self.http_streams.borrow_mut().clear();
142        self.http_callbacks.borrow_mut().clear();
143        self.grpc_callbacks.borrow_mut().clear();
144        self.grpc_streams.borrow_mut().clear();
145        self.queue_callbacks.borrow_mut().clear();
146        self.roots.borrow_mut().clear();
147        self.active_id.set(0);
148        self.active_root_id.set(0);
149    }
150
151    fn root<'a>(
152        roots: &'a mut RefMut<'_, HashMap<u32, RootInfo>>,
153        root_context_id: u32,
154    ) -> &'a mut DowncastBox<dyn RootContext> {
155        roots.entry(root_context_id).or_insert_with(|| RootInfo {
156            data: ROOT_INIT
157                .lock()
158                .unwrap()
159                .as_ref()
160                .expect("missing root_context_factory")(),
161        });
162        &mut roots.get_mut(&root_context_id).unwrap().data
163    }
164}
165
166/// Sets root context factory. Should be called from _init. Can only be called once.
167pub fn set_root_context_factory<R: RootContext + 'static>(root: fn() -> R) {
168    *ROOT_INIT.lock().unwrap() = Some(Box::new(move || DowncastBox::new(Box::new(root()))));
169}
170
171pub(crate) fn register_http_callback(
172    token: u32,
173    callback: Box<dyn FnOnce(&mut DowncastBox<dyn RootContext>, &HttpCallResponse)>,
174) {
175    dispatch(|d| {
176        d.http_callbacks.borrow_mut().insert(
177            token,
178            HttpCallback {
179                context_id: d.active_id.get(),
180                root_context_id: d.active_root_id.get(),
181                callback,
182            },
183        )
184    });
185}
186
187pub(crate) fn register_grpc_callback(
188    token: u32,
189    callback: Box<dyn FnOnce(&mut DowncastBox<dyn RootContext>, &GrpcCallResponse)>,
190) {
191    dispatch(|d| {
192        d.grpc_callbacks.borrow_mut().insert(
193            token,
194            GrpcCallback {
195                context_id: d.active_id.get(),
196                root_context_id: d.active_root_id.get(),
197                callback,
198            },
199        )
200    });
201}
202
203#[cfg(feature = "stream-metadata")]
204pub(crate) fn register_grpc_stream_initial_meta(
205    token: u32,
206    callback: Box<
207        dyn FnMut(&mut DowncastBox<dyn RootContext>, GrpcStreamHandle, &GrpcStreamInitialMetadata),
208    >,
209) {
210    dispatch(|d| {
211        let context_id = d.d.active_id.get();
212        let root_context_id = d.active_root_id.get();
213        match d.grpc_streams.borrow_mut().entry(token) {
214            Entry::Occupied(entry) if entry.get().context_id != context_id => {
215                error!(
216                    "mismatch in context for register_grpc_stream_initial_meta! {} != {}",
217                    entry.get().context_id,
218                    context_id
219                );
220            }
221            Entry::Occupied(mut entry) => {
222                entry.get_mut().initial_meta = Some(callback);
223            }
224            Entry::Vacant(entry) => {
225                entry.insert(GrpcStreamCallback {
226                    context_id,
227                    root_context_id,
228                    initial_meta: Some(callback),
229                    ..Default::default()
230                });
231            }
232        }
233    });
234}
235
236pub(crate) fn register_grpc_stream_message(
237    token: u32,
238    callback: Box<
239        dyn FnMut(&mut DowncastBox<dyn RootContext>, GrpcStreamHandle, &GrpcStreamMessage),
240    >,
241) {
242    dispatch(|d| {
243        let context_id = d.active_id.get();
244        let root_context_id = d.active_root_id.get();
245        match d.grpc_streams.borrow_mut().entry(token) {
246            Entry::Occupied(entry) if entry.get().context_id != context_id => {
247                error!(
248                    "mismatch in context for register_grpc_stream_message! {} != {}",
249                    entry.get().context_id,
250                    context_id
251                );
252            }
253            Entry::Occupied(mut entry) => {
254                entry.get_mut().message = Some(callback);
255            }
256            Entry::Vacant(entry) => {
257                entry.insert(GrpcStreamCallback {
258                    context_id,
259                    root_context_id,
260                    message: Some(callback),
261                    ..Default::default()
262                });
263            }
264        }
265    });
266}
267
268#[cfg(feature = "stream-metadata")]
269pub(crate) fn register_grpc_stream_trailing_metadata(
270    token: u32,
271    callback: Box<
272        dyn FnMut(&mut DowncastBox<dyn RootContext>, GrpcStreamHandle, &GrpcStreamTrailingMetadata),
273    >,
274) {
275    dispatch(|d| {
276        let context_id = d.active_id.get();
277        let root_context_id = d.active_root_id.get();
278        match d.grpc_streams.borrow_mut().entry(token) {
279            Entry::Occupied(entry) if entry.get().context_id != context_id => {
280                error!(
281                    "mismatch in context for register_grpc_stream_trailing_metadata! {} != {}",
282                    entry.get().context_id,
283                    context_id
284                );
285            }
286            Entry::Occupied(mut entry) => {
287                entry.get_mut().trailer_meta = Some(callback);
288            }
289            Entry::Vacant(entry) => {
290                entry.insert(GrpcStreamCallback {
291                    context_id,
292                    root_context_id,
293                    trailer_meta: Some(callback),
294                    ..Default::default()
295                });
296            }
297        }
298    });
299}
300
301pub(crate) fn register_grpc_stream_close(
302    token: u32,
303    callback: Box<dyn FnOnce(&mut DowncastBox<dyn RootContext>, &GrpcStreamClose)>,
304) {
305    dispatch(|d| {
306        let context_id = d.active_id.get();
307        let root_context_id = d.active_root_id.get();
308        match d.grpc_streams.borrow_mut().entry(token) {
309            Entry::Occupied(entry) if entry.get().context_id != context_id => {
310                error!(
311                    "mismatch in context for register_grpc_stream_close! {} != {}",
312                    entry.get().context_id,
313                    context_id
314                );
315            }
316            Entry::Occupied(mut entry) => {
317                entry.get_mut().close = Some(callback);
318            }
319            Entry::Vacant(entry) => {
320                entry.insert(GrpcStreamCallback {
321                    context_id,
322                    root_context_id,
323                    close: Some(callback),
324                    ..Default::default()
325                });
326            }
327        }
328    })
329}
330
331pub(crate) fn register_queue_callback<R: RootContext + 'static>(
332    token: u32,
333    mut callback: impl FnMut(&mut R, Queue) + 'static,
334) {
335    dispatch(|d| {
336        d.queue_callbacks.borrow_mut().insert(
337            token,
338            Box::new(move |root, queue| {
339                callback(
340                    root.as_any_mut().downcast_mut().expect("invalid root type"),
341                    queue,
342                )
343            }),
344        );
345    })
346}
347
348struct EffectiveContext {
349    name: &'static str,
350    prior: u32,
351    prior_root: u32,
352}
353
354impl EffectiveContext {
355    pub fn enter(id: u32, root_id: u32, name: &'static str) -> Option<Self> {
356        if let Err(e) = hostcalls::set_effective_context(id) {
357            debug!("failed to assume context {root_id}/{id} for {name}: {e:?}");
358            return None;
359        };
360        let (prior, prior_root) = dispatch(|d| {
361            let prior = d.active_id.get();
362            d.active_id.set(id);
363            let prior_root = d.active_root_id.get();
364            d.active_root_id.set(root_id);
365            (prior, prior_root)
366        });
367        Some(Self {
368            name,
369            prior,
370            prior_root,
371        })
372    }
373}
374
375impl Drop for EffectiveContext {
376    fn drop(&mut self) {
377        if let Err(e) = hostcalls::set_effective_context(self.prior) {
378            debug!("failed to reset context for {}: {e:?}", self.name);
379        };
380        dispatch(|d| {
381            d.active_id.set(self.prior);
382            d.active_root_id.set(self.prior_root);
383        });
384    }
385}
386
387impl Dispatcher {
388    fn new() -> Dispatcher {
389        Self::default()
390    }
391
392    fn do_create_subcontext(&self, root_context_id: u32, context_id: u32) {
393        let mut roots = self.roots.borrow_mut();
394        let root = Self::root(&mut roots, root_context_id);
395        match root.create_context() {
396            Context::Http(context) => {
397                if self
398                    .http_streams
399                    .borrow_mut()
400                    .insert(
401                        context_id,
402                        HttpStreamInfo {
403                            parent_context_id: root_context_id,
404                            data: context,
405                        },
406                    )
407                    .is_some()
408                {
409                    warn!("reused context_id without proper cleanup");
410                }
411            }
412            Context::Stream(context) => {
413                if self
414                    .streams
415                    .borrow_mut()
416                    .insert(
417                        context_id,
418                        StreamInfo {
419                            parent_context_id: root_context_id,
420                            data: context,
421                        },
422                    )
423                    .is_some()
424                {
425                    warn!("reused context_id without proper cleanup");
426                }
427            }
428        }
429    }
430
431    fn on_create_context(&self, context_id: u32, parent_context_id: u32) {
432        if parent_context_id == 0 {
433            let mut roots = self.roots.borrow_mut();
434            Self::root(&mut roots, context_id);
435        } else if self.roots.borrow().contains_key(&parent_context_id) {
436            self.do_create_subcontext(parent_context_id, context_id);
437        } else {
438            warn!("attempted to create context {context_id} under unknown context {parent_context_id}");
439        }
440    }
441
442    fn on_done(&self, context_id: u32) -> bool {
443        if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
444            self.active_id.set(context_id);
445            self.active_root_id.set(http_stream.parent_context_id);
446            http_stream.data.on_done()
447        } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
448            self.active_id.set(context_id);
449            self.active_root_id.set(stream.parent_context_id);
450            stream.data.on_done()
451        } else if self.roots.borrow().contains_key(&context_id) {
452            self.active_id.set(context_id);
453            self.active_root_id.set(context_id);
454            let mut roots = self.roots.borrow_mut();
455            Self::root(&mut roots, context_id).on_done()
456        } else {
457            warn!("on_done called on unknown context: {context_id}");
458            true
459        }
460    }
461
462    fn on_log(&self, context_id: u32) {
463        if let Some(http_stream) = self.http_streams.borrow_mut().get_mut(&context_id) {
464            self.active_id.set(context_id);
465            self.active_root_id.set(http_stream.parent_context_id);
466            http_stream.data.on_log();
467        } else if let Some(stream) = self.streams.borrow_mut().get_mut(&context_id) {
468            self.active_id.set(context_id);
469            self.active_root_id.set(stream.parent_context_id);
470            stream.data.on_log();
471        } else if self.roots.borrow().contains_key(&context_id) {
472            self.active_id.set(context_id);
473            self.active_root_id.set(context_id);
474            let mut roots = self.roots.borrow_mut();
475            Self::root(&mut roots, context_id).on_log();
476        } else {
477            warn!("on_log called on unknown context: {context_id}");
478        }
479    }
480
481    fn on_delete(&self, context_id: u32) {
482        if self.http_streams.borrow_mut().remove(&context_id).is_some() {
483            return;
484        }
485        if self.streams.borrow_mut().remove(&context_id).is_some() {
486            return;
487        }
488        if self.roots.borrow_mut().remove(&context_id).is_some() {
489            return;
490        }
491        warn!("deleting unknown context_id {context_id}");
492    }
493
494    fn on_vm_start(&self, context_id: u32, vm_configuration_size: usize) -> bool {
495        if !self.roots.borrow().contains_key(&context_id) {
496            warn!("received on_vm_start for non-root-context: {context_id}");
497            return true;
498        }
499        let Some(configuration) = check_concern(
500            "vm-start-config",
501            hostcalls::get_buffer(BufferType::VmConfiguration, 0, vm_configuration_size),
502        ) else {
503            return false;
504        };
505        self.active_id.set(context_id);
506        self.active_root_id.set(context_id);
507        let mut roots = self.roots.borrow_mut();
508        Self::root(&mut roots, context_id).on_vm_start(configuration)
509    }
510
511    fn on_configure(&self, context_id: u32, plugin_configuration_size: usize) -> bool {
512        if !self.roots.borrow().contains_key(&context_id) {
513            warn!("received on_configure for non-root-context: {context_id}");
514            return true;
515        }
516        let Some(configuration) = check_concern(
517            "configure-fetch",
518            hostcalls::get_buffer(
519                BufferType::PluginConfiguration,
520                0,
521                plugin_configuration_size,
522            ),
523        ) else {
524            return false;
525        };
526        self.active_id.set(context_id);
527        self.active_root_id.set(context_id);
528        let mut roots = self.roots.borrow_mut();
529        Self::root(&mut roots, context_id).on_configure(configuration)
530    }
531
532    fn on_tick(&self, context_id: u32) {
533        if !self.roots.borrow().contains_key(&context_id) {
534            warn!("received on_tick for non-root-context: {context_id}");
535            return;
536        }
537        self.active_id.set(context_id);
538        self.active_root_id.set(context_id);
539        let mut roots = self.roots.borrow_mut();
540        Self::root(&mut roots, context_id).on_tick();
541    }
542
543    fn on_queue_ready(&self, context_id: u32, queue_id: u32) {
544        if !self.roots.borrow().contains_key(&context_id) {
545            warn!("received on_queue_ready for non-root-context: {context_id}");
546            return;
547        }
548        if let Some(callback) = self.queue_callbacks.borrow_mut().get_mut(&queue_id) {
549            let mut roots = self.roots.borrow_mut();
550            callback(
551                &mut roots.get_mut(&context_id).unwrap().data,
552                Queue(queue_id),
553            );
554        }
555    }
556
557    fn on_new_connection(&self, context_id: u32) -> FilterStreamStatus {
558        let mut streams = self.streams.borrow_mut();
559        let stream = if let Some(context) = streams.get_mut(&context_id) {
560            context
561        } else {
562            // self.do_create_subcontext(context_id);
563            // let Some(context) = self.streams.get_mut(&context_id) else {
564            warn!(
565                "no http context found for context (and was not implicitly created): {context_id}"
566            );
567            return FilterStreamStatus::Continue;
568            // };
569            // context
570        };
571        self.active_id.set(context_id);
572        self.active_root_id.set(stream.parent_context_id);
573        stream.data.on_new_connection()
574    }
575
576    fn on_downstream_data(
577        &self,
578        context_id: u32,
579        data_size: usize,
580        end_of_stream: bool,
581    ) -> FilterStreamStatus {
582        let mut streams = self.streams.borrow_mut();
583        let Some(stream) = streams.get_mut(&context_id) else {
584            return FilterStreamStatus::Continue;
585        };
586        self.active_id.set(context_id);
587        self.active_root_id.set(stream.parent_context_id);
588        stream.data.on_downstream_data(&DownstreamData {
589            data_size,
590            end_of_stream,
591            attributes: Attributes::get(),
592        })
593    }
594
595    fn on_downstream_close(&self, context_id: u32, close_type: CloseType) {
596        let mut streams = self.streams.borrow_mut();
597        let Some(stream) = streams.get_mut(&context_id) else {
598            return;
599        };
600        self.active_id.set(context_id);
601        self.active_root_id.set(stream.parent_context_id);
602        stream.data.on_downstream_close(&StreamClose {
603            close_type,
604            attributes: Attributes::get(),
605        })
606    }
607
608    fn on_upstream_data(
609        &self,
610        context_id: u32,
611        data_size: usize,
612        end_of_stream: bool,
613    ) -> FilterStreamStatus {
614        let mut streams = self.streams.borrow_mut();
615        let Some(stream) = streams.get_mut(&context_id) else {
616            return FilterStreamStatus::Continue;
617        };
618        self.active_id.set(context_id);
619        self.active_root_id.set(stream.parent_context_id);
620        stream.data.on_upstream_data(&UpstreamData {
621            data_size,
622            end_of_stream,
623            attributes: Attributes::get(),
624        })
625    }
626
627    fn on_upstream_close(&self, context_id: u32, close_type: CloseType) {
628        let mut streams = self.streams.borrow_mut();
629        let Some(stream) = streams.get_mut(&context_id) else {
630            return;
631        };
632        self.active_id.set(context_id);
633        self.active_root_id.set(stream.parent_context_id);
634        stream.data.on_upstream_close(&StreamClose {
635            close_type,
636            attributes: Attributes::get(),
637        })
638    }
639
640    fn on_http_request_headers(
641        &self,
642        context_id: u32,
643        header_count: usize,
644        end_of_stream: bool,
645    ) -> FilterHeadersStatus {
646        let mut http_streams = self.http_streams.borrow_mut();
647        let context = if let Some(context) = http_streams.get_mut(&context_id) {
648            context
649        } else {
650            // self.do_create_subcontext(context_id);
651            // let Some(context) = self.http_streams.get_mut(&context_id) else {
652            warn!("no http context found for on_http_request_headers (and was not implicitly created): {context_id}");
653            return FilterHeadersStatus::Continue;
654            // };
655            // context
656        };
657        self.active_id.set(context_id);
658        self.active_root_id.set(context.parent_context_id);
659        context.data.on_http_request_headers(&RequestHeaders {
660            header_count,
661            end_of_stream,
662            attributes: Attributes::get(),
663        })
664    }
665
666    fn on_http_request_body(
667        &self,
668        context_id: u32,
669        body_size: usize,
670        end_of_stream: bool,
671    ) -> FilterDataStatus {
672        let mut http_streams = self.http_streams.borrow_mut();
673        let Some(context) = http_streams.get_mut(&context_id) else {
674            warn!("no http context found for on_http_request_body: {context_id}");
675            return FilterDataStatus::Continue;
676        };
677        self.active_id.set(context_id);
678        self.active_root_id.set(context.parent_context_id);
679        context.data.on_http_request_body(&RequestBody {
680            body_size,
681            end_of_stream,
682            attributes: Attributes::get(),
683        })
684    }
685
686    fn on_http_request_trailers(
687        &self,
688        context_id: u32,
689        trailer_count: usize,
690    ) -> FilterTrailersStatus {
691        let mut http_streams = self.http_streams.borrow_mut();
692        let Some(context) = http_streams.get_mut(&context_id) else {
693            warn!("no http context found for on_http_request_trailers: {context_id}");
694            return FilterTrailersStatus::Continue;
695        };
696        self.active_id.set(context_id);
697        self.active_root_id.set(context.parent_context_id);
698        context.data.on_http_request_trailers(&RequestTrailers {
699            trailer_count,
700            attributes: Attributes::get(),
701        })
702    }
703
704    fn on_http_response_headers(
705        &self,
706        context_id: u32,
707        header_count: usize,
708        end_of_stream: bool,
709    ) -> FilterHeadersStatus {
710        let mut http_streams = self.http_streams.borrow_mut();
711        let Some(context) = http_streams.get_mut(&context_id) else {
712            warn!("no http context found for on_http_response_headers: {context_id}");
713            return FilterHeadersStatus::Continue;
714        };
715        self.active_id.set(context_id);
716        self.active_root_id.set(context.parent_context_id);
717        context.data.on_http_response_headers(&ResponseHeaders {
718            header_count,
719            end_of_stream,
720            attributes: Attributes::get(),
721        })
722    }
723
724    fn on_http_response_body(
725        &self,
726        context_id: u32,
727        body_size: usize,
728        end_of_stream: bool,
729    ) -> FilterDataStatus {
730        let mut http_streams = self.http_streams.borrow_mut();
731        let Some(context) = http_streams.get_mut(&context_id) else {
732            warn!("no http context found for on_http_response_body: {context_id}");
733            return FilterDataStatus::Continue;
734        };
735        self.active_id.set(context_id);
736        self.active_root_id.set(context.parent_context_id);
737        context.data.on_http_response_body(&ResponseBody {
738            body_size,
739            end_of_stream,
740            attributes: Attributes::get(),
741        })
742    }
743
744    fn on_http_response_trailers(
745        &self,
746        context_id: u32,
747        trailer_count: usize,
748    ) -> FilterTrailersStatus {
749        let mut http_streams = self.http_streams.borrow_mut();
750        let Some(context) = http_streams.get_mut(&context_id) else {
751            warn!("no http context found for on_http_response_trailers: {context_id}");
752            return FilterTrailersStatus::Continue;
753        };
754        self.active_id.set(context_id);
755        self.active_root_id.set(context.parent_context_id);
756        context.data.on_http_response_trailers(&ResponseTrailers {
757            trailer_count,
758            attributes: Attributes::get(),
759        })
760    }
761
762    fn on_http_call_response(
763        &self,
764        token_id: u32,
765        num_headers: usize,
766        body_size: usize,
767        num_trailers: usize,
768    ) {
769        let Some(callback) = self.http_callbacks.borrow_mut().remove(&token_id) else {
770            debug!(
771                "received http_call_response for token {token_id}, but no callback was registered"
772            );
773            return;
774        };
775        let mut roots = self.roots.borrow_mut();
776        let Some(root) = roots.get_mut(&callback.root_context_id) else {
777            debug!("referenced non-existing root context");
778            return;
779        };
780        let Some(_ctx) = EffectiveContext::enter(
781            callback.context_id,
782            callback.root_context_id,
783            "http callback",
784        ) else {
785            return;
786        };
787        (callback.callback)(
788            &mut root.data,
789            &HttpCallResponse::new(num_headers, body_size, num_trailers),
790        );
791    }
792
793    #[cfg(feature = "stream-metadata")]
794    fn on_grpc_receive_initial_metadata(&self, token_id: u32, num_headers: u32) {
795        let mut grpc_streams = self.grpc_streams;
796        let Some(callback) = grpc_streams.get_mut(&token_id) else {
797            debug!("received grpc message for unknown token {token_id}");
798            return;
799        };
800        let Some(function) = &mut callback.initial_meta else {
801            return;
802        };
803        let mut roots = self.roots.borrow_mut();
804        let Some(root) = roots.get_mut(&callback.root_context_id) else {
805            debug!("referenced non-existing root context");
806            return;
807        };
808
809        let Some(_ctx) =
810            EffectiveContext::enter(callback.context_id, callback.root_context_id, "grpc stream")
811        else {
812            return;
813        };
814
815        function(
816            &mut root.data,
817            GrpcStreamHandle(token_id),
818            &GrpcStreamInitialMetadata::new(num_headers as usize),
819        );
820    }
821
822    fn on_grpc_receive(&self, token_id: u32, response_size: usize) {
823        if let Some(callback) = self.grpc_callbacks.borrow_mut().remove(&token_id) {
824            let mut roots = self.roots.borrow_mut();
825            let Some(root) = roots.get_mut(&callback.root_context_id) else {
826                debug!("referenced non-existing root context");
827                return;
828            };
829            let Some(_ctx) = EffectiveContext::enter(
830                callback.context_id,
831                callback.root_context_id,
832                "grpc callback",
833            ) else {
834                return;
835            };
836
837            (callback.callback)(
838                &mut root.data,
839                &GrpcCallResponse::new(token_id, GrpcCode::Ok, None, response_size),
840            );
841        } else if let Some(callback) = self.grpc_streams.borrow_mut().get_mut(&token_id) {
842            let Some(function) = &mut callback.message else {
843                return;
844            };
845            let mut roots = self.roots.borrow_mut();
846            let Some(root) = roots.get_mut(&callback.root_context_id) else {
847                debug!("referenced non-existing root context");
848                return;
849            };
850
851            let Some(_ctx) = EffectiveContext::enter(
852                callback.context_id,
853                callback.root_context_id,
854                "grpc stream",
855            ) else {
856                return;
857            };
858
859            function(
860                &mut root.data,
861                GrpcStreamHandle(token_id),
862                &GrpcStreamMessage::new(GrpcCode::Ok, None, response_size),
863            );
864        } else {
865            debug!("received grpc message for unknown token {token_id}");
866        }
867    }
868
869    #[cfg(feature = "stream-metadata")]
870    fn on_grpc_receive_trailing_metadata(&self, token_id: u32, num_headers: u32) {
871        let mut grpc_streams = self.grpc_streams.borrow_mut();
872        let Some(callback) = grpc_streams.get_mut(&token_id) else {
873            debug!("received grpc message for unknown token {token_id}");
874            return;
875        };
876        let Some(function) = &mut callback.trailer_meta else {
877            return;
878        };
879        let mut roots = self.roots.borrow_mut();
880        let Some(root) = roots.get_mut(&callback.root_context_id) else {
881            debug!("referenced non-existing root context");
882            return;
883        };
884        let Some(_ctx) =
885            EffectiveContext::enter(callback.context_id, callback.root_context_id, "grpc stream")
886        else {
887            return;
888        };
889
890        function(
891            &mut root.data,
892            GrpcStreamHandle(token_id),
893            &GrpcStreamTrailingMetadata::new(num_headers as usize),
894        );
895    }
896
897    fn on_grpc_close(&self, token_id: u32, status_code: u32) {
898        if let Some(callback) = self.grpc_callbacks.borrow_mut().remove(&token_id) {
899            let mut roots = self.roots.borrow_mut();
900            let Some(root) = roots.get_mut(&callback.root_context_id) else {
901                debug!("referenced non-existing root context");
902                return;
903            };
904            let Some(_ctx) = EffectiveContext::enter(
905                callback.context_id,
906                callback.root_context_id,
907                "grpc callback",
908            ) else {
909                return;
910            };
911            let Some((status, message)) =
912                check_concern("grpc-call-close-status", hostcalls::get_grpc_status())
913            else {
914                return;
915            };
916            if status != status_code {
917                warn!("status code mismatch for on_grpc_close");
918            }
919
920            (callback.callback)(
921                &mut root.data,
922                &GrpcCallResponse::new(token_id, status.into(), message, 0),
923            );
924        } else if let Some(callback) = self.grpc_streams.borrow_mut().remove(&token_id) {
925            let Some(function) = callback.close else {
926                return;
927            };
928            let mut roots = self.roots.borrow_mut();
929            let Some(root) = roots.get_mut(&callback.root_context_id) else {
930                debug!("referenced non-existing root context");
931                return;
932            };
933            let Some(_ctx) = EffectiveContext::enter(
934                callback.context_id,
935                callback.root_context_id,
936                "grpc stream",
937            ) else {
938                return;
939            };
940            let Some((status, message)) =
941                check_concern("grpc-stream-close-status", hostcalls::get_grpc_status())
942            else {
943                return;
944            };
945
946            function(
947                &mut root.data,
948                &GrpcStreamClose::new(token_id, status.into(), message),
949            );
950        } else {
951            debug!("received grpc close for unknown token {token_id}");
952        }
953    }
954}
955
956#[no_mangle]
957pub extern "C" fn proxy_on_context_create(context_id: usize, root_context_id: usize) {
958    dispatch(|d| d.on_create_context(context_id as u32, root_context_id as u32))
959}
960
961#[no_mangle]
962pub extern "C" fn proxy_on_done(context_id: usize) -> usize {
963    dispatch(|d| d.on_done(context_id as u32)) as usize
964}
965
966#[no_mangle]
967pub extern "C" fn proxy_on_log(context_id: usize) {
968    dispatch(|d| d.on_log(context_id as u32))
969}
970
971#[no_mangle]
972pub extern "C" fn proxy_on_delete(context_id: usize) {
973    dispatch(|d| d.on_delete(context_id as u32))
974}
975
976#[no_mangle]
977pub extern "C" fn proxy_on_vm_start(context_id: usize, vm_configuration_size: usize) -> usize {
978    dispatch(|d| d.on_vm_start(context_id as u32, vm_configuration_size)) as usize
979}
980
981#[no_mangle]
982pub extern "C" fn proxy_on_configure(context_id: usize, plugin_configuration_size: usize) -> usize {
983    dispatch(|d| d.on_configure(context_id as u32, plugin_configuration_size)) as usize
984}
985
986#[no_mangle]
987pub extern "C" fn proxy_on_tick(context_id: usize) {
988    dispatch(|d| d.on_tick(context_id as u32))
989}
990
991#[no_mangle]
992pub extern "C" fn proxy_on_queue_ready(context_id: usize, queue_id: usize) {
993    dispatch(|d| d.on_queue_ready(context_id as u32, queue_id as u32))
994}
995
996#[no_mangle]
997pub extern "C" fn proxy_on_new_connection(context_id: usize) -> FilterStreamStatus {
998    dispatch(|d| d.on_new_connection(context_id as u32))
999}
1000
1001#[no_mangle]
1002pub extern "C" fn proxy_on_downstream_data(
1003    context_id: usize,
1004    data_size: usize,
1005    end_of_stream: usize,
1006) -> FilterStreamStatus {
1007    dispatch(|d| d.on_downstream_data(context_id as u32, data_size, end_of_stream != 0))
1008}
1009
1010#[no_mangle]
1011pub extern "C" fn proxy_on_downstream_connection_close(context_id: usize, close_type: CloseType) {
1012    dispatch(|d| d.on_downstream_close(context_id as u32, close_type))
1013}
1014
1015#[no_mangle]
1016pub extern "C" fn proxy_on_upstream_data(
1017    context_id: usize,
1018    data_size: usize,
1019    end_of_stream: usize,
1020) -> FilterStreamStatus {
1021    dispatch(|d| d.on_upstream_data(context_id as u32, data_size, end_of_stream != 0))
1022}
1023
1024#[no_mangle]
1025pub extern "C" fn proxy_on_upstream_connection_close(context_id: usize, close_type: CloseType) {
1026    dispatch(|d| d.on_upstream_close(context_id as u32, close_type))
1027}
1028
1029#[no_mangle]
1030pub extern "C" fn proxy_on_request_headers(
1031    context_id: usize,
1032    num_headers: usize,
1033    end_of_stream: usize,
1034) -> FilterHeadersStatus {
1035    dispatch(|d| d.on_http_request_headers(context_id as u32, num_headers, end_of_stream != 0))
1036}
1037
1038#[no_mangle]
1039pub extern "C" fn proxy_on_request_body(
1040    context_id: usize,
1041    body_size: usize,
1042    end_of_stream: usize,
1043) -> FilterDataStatus {
1044    dispatch(|d| d.on_http_request_body(context_id as u32, body_size, end_of_stream != 0))
1045}
1046
1047#[no_mangle]
1048pub extern "C" fn proxy_on_request_trailers(
1049    context_id: usize,
1050    num_trailers: usize,
1051) -> FilterTrailersStatus {
1052    dispatch(|d| d.on_http_request_trailers(context_id as u32, num_trailers))
1053}
1054
1055#[no_mangle]
1056pub extern "C" fn proxy_on_response_headers(
1057    context_id: usize,
1058    num_headers: usize,
1059    end_of_stream: usize,
1060) -> FilterHeadersStatus {
1061    dispatch(|d| d.on_http_response_headers(context_id as u32, num_headers, end_of_stream != 0))
1062}
1063
1064#[no_mangle]
1065pub extern "C" fn proxy_on_response_body(
1066    context_id: usize,
1067    body_size: usize,
1068    end_of_stream: usize,
1069) -> FilterDataStatus {
1070    dispatch(|d| d.on_http_response_body(context_id as u32, body_size, end_of_stream != 0))
1071}
1072
1073#[no_mangle]
1074pub extern "C" fn proxy_on_response_trailers(
1075    context_id: usize,
1076    num_trailers: usize,
1077) -> FilterTrailersStatus {
1078    dispatch(|d| d.on_http_response_trailers(context_id as u32, num_trailers))
1079}
1080
1081#[no_mangle]
1082pub extern "C" fn proxy_on_http_call_response(
1083    _context_id: usize,
1084    token_id: usize,
1085    num_headers: usize,
1086    body_size: usize,
1087    num_trailers: usize,
1088) {
1089    dispatch(|d| d.on_http_call_response(token_id as u32, num_headers, body_size, num_trailers))
1090}
1091
1092#[cfg(feature = "stream-metadata")]
1093#[no_mangle]
1094pub extern "C" fn proxy_on_grpc_receive_initial_metadata(
1095    _context_id: usize,
1096    token_id: usize,
1097    headers: usize,
1098) {
1099    DISPATCHER
1100        .with_borrow_mut(|d| d.on_grpc_receive_initial_metadata(token_id as u32, headers as u32))
1101}
1102
1103#[no_mangle]
1104pub extern "C" fn proxy_on_grpc_receive(_context_id: usize, token_id: usize, response_size: usize) {
1105    dispatch(|d| d.on_grpc_receive(token_id as u32, response_size))
1106}
1107
1108#[cfg(feature = "stream-metadata")]
1109#[no_mangle]
1110pub extern "C" fn proxy_on_grpc_receive_trailing_metadata(
1111    _context_id: usize,
1112    token_id: usize,
1113    trailers: usize,
1114) {
1115    dispatch(|d| d.on_grpc_receive_trailing_metadata(token_id as usize, trailers as usize))
1116}
1117
1118#[no_mangle]
1119pub extern "C" fn proxy_on_grpc_close(_context_id: usize, token_id: usize, status_code: usize) {
1120    dispatch(|d| d.on_grpc_close(token_id as u32, status_code as u32))
1121}